(2.10) RAPI: Make use of request_body in Reboot/Remove
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38 import operator
39
40 try:
41   # pylint: disable=E0611
42   from pyinotify import pyinotify
43 except ImportError:
44   import pyinotify
45
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import errors
53 from ganeti import mcpu
54 from ganeti import utils
55 from ganeti import jstore
56 from ganeti import rpc
57 from ganeti import runtime
58 from ganeti import netutils
59 from ganeti import compat
60 from ganeti import ht
61 from ganeti import query
62 from ganeti import qlang
63 from ganeti import pathutils
64 from ganeti import vcluster
65
66
67 JOBQUEUE_THREADS = 25
68
69 # member lock names to be passed to @ssynchronized decorator
70 _LOCK = "_lock"
71 _QUEUE = "_queue"
72
73 #: Retrieves "id" attribute
74 _GetIdAttr = operator.attrgetter("id")
75
76
77 class CancelJob(Exception):
78   """Special exception to cancel a job.
79
80   """
81
82
83 class QueueShutdown(Exception):
84   """Special exception to abort a job when the job queue is shutting down.
85
86   """
87
88
89 def TimeStampNow():
90   """Returns the current timestamp.
91
92   @rtype: tuple
93   @return: the current time in the (seconds, microseconds) format
94
95   """
96   return utils.SplitTime(time.time())
97
98
99 def _CallJqUpdate(runner, names, file_name, content):
100   """Updates job queue file after virtualizing filename.
101
102   """
103   virt_file_name = vcluster.MakeVirtualPath(file_name)
104   return runner.call_jobqueue_update(names, virt_file_name, content)
105
106
107 class _SimpleJobQuery:
108   """Wrapper for job queries.
109
110   Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111
112   """
113   def __init__(self, fields):
114     """Initializes this class.
115
116     """
117     self._query = query.Query(query.JOB_FIELDS, fields)
118
119   def __call__(self, job):
120     """Executes a job query using cached field list.
121
122     """
123     return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124
125
126 class _QueuedOpCode(object):
127   """Encapsulates an opcode object.
128
129   @ivar log: holds the execution log and consists of tuples
130   of the form C{(log_serial, timestamp, level, message)}
131   @ivar input: the OpCode we encapsulate
132   @ivar status: the current status
133   @ivar result: the result of the LU execution
134   @ivar start_timestamp: timestamp for the start of the execution
135   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136   @ivar stop_timestamp: timestamp for the end of the execution
137
138   """
139   __slots__ = ["input", "status", "result", "log", "priority",
140                "start_timestamp", "exec_timestamp", "end_timestamp",
141                "__weakref__"]
142
143   def __init__(self, op):
144     """Initializes instances of this class.
145
146     @type op: L{opcodes.OpCode}
147     @param op: the opcode we encapsulate
148
149     """
150     self.input = op
151     self.status = constants.OP_STATUS_QUEUED
152     self.result = None
153     self.log = []
154     self.start_timestamp = None
155     self.exec_timestamp = None
156     self.end_timestamp = None
157
158     # Get initial priority (it might change during the lifetime of this opcode)
159     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160
161   @classmethod
162   def Restore(cls, state):
163     """Restore the _QueuedOpCode from the serialized form.
164
165     @type state: dict
166     @param state: the serialized state
167     @rtype: _QueuedOpCode
168     @return: a new _QueuedOpCode instance
169
170     """
171     obj = _QueuedOpCode.__new__(cls)
172     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173     obj.status = state["status"]
174     obj.result = state["result"]
175     obj.log = state["log"]
176     obj.start_timestamp = state.get("start_timestamp", None)
177     obj.exec_timestamp = state.get("exec_timestamp", None)
178     obj.end_timestamp = state.get("end_timestamp", None)
179     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180     return obj
181
182   def Serialize(self):
183     """Serializes this _QueuedOpCode.
184
185     @rtype: dict
186     @return: the dictionary holding the serialized state
187
188     """
189     return {
190       "input": self.input.__getstate__(),
191       "status": self.status,
192       "result": self.result,
193       "log": self.log,
194       "start_timestamp": self.start_timestamp,
195       "exec_timestamp": self.exec_timestamp,
196       "end_timestamp": self.end_timestamp,
197       "priority": self.priority,
198       }
199
200
201 class _QueuedJob(object):
202   """In-memory job representation.
203
204   This is what we use to track the user-submitted jobs. Locking must
205   be taken care of by users of this class.
206
207   @type queue: L{JobQueue}
208   @ivar queue: the parent queue
209   @ivar id: the job ID
210   @type ops: list
211   @ivar ops: the list of _QueuedOpCode that constitute the job
212   @type log_serial: int
213   @ivar log_serial: holds the index for the next log entry
214   @ivar received_timestamp: the timestamp for when the job was received
215   @ivar start_timestmap: the timestamp for start of execution
216   @ivar end_timestamp: the timestamp for end of execution
217   @ivar writable: Whether the job is allowed to be modified
218
219   """
220   # pylint: disable=W0212
221   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222                "received_timestamp", "start_timestamp", "end_timestamp",
223                "__weakref__", "processor_lock", "writable", "archived"]
224
225   def _AddReasons(self):
226     """Extend the reason trail
227
228     Add the reason for all the opcodes of this job to be executed.
229
230     """
231     count = 0
232     for queued_op in self.ops:
233       op = queued_op.input
234       reason_src = opcodes.NameToReasonSrc(op.__class__.__name__)
235       reason_text = "job=%d;index=%d" % (self.id, count)
236       reason = getattr(op, "reason", [])
237       reason.append((reason_src, reason_text, utils.EpochNano()))
238       op.reason = reason
239       count = count + 1
240
241   def __init__(self, queue, job_id, ops, writable):
242     """Constructor for the _QueuedJob.
243
244     @type queue: L{JobQueue}
245     @param queue: our parent queue
246     @type job_id: job_id
247     @param job_id: our job id
248     @type ops: list
249     @param ops: the list of opcodes we hold, which will be encapsulated
250         in _QueuedOpCodes
251     @type writable: bool
252     @param writable: Whether job can be modified
253
254     """
255     if not ops:
256       raise errors.GenericError("A job needs at least one opcode")
257
258     self.queue = queue
259     self.id = int(job_id)
260     self.ops = [_QueuedOpCode(op) for op in ops]
261     self._AddReasons()
262     self.log_serial = 0
263     self.received_timestamp = TimeStampNow()
264     self.start_timestamp = None
265     self.end_timestamp = None
266     self.archived = False
267
268     self._InitInMemory(self, writable)
269
270     assert not self.archived, "New jobs can not be marked as archived"
271
272   @staticmethod
273   def _InitInMemory(obj, writable):
274     """Initializes in-memory variables.
275
276     """
277     obj.writable = writable
278     obj.ops_iter = None
279     obj.cur_opctx = None
280
281     # Read-only jobs are not processed and therefore don't need a lock
282     if writable:
283       obj.processor_lock = threading.Lock()
284     else:
285       obj.processor_lock = None
286
287   def __repr__(self):
288     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
289               "id=%s" % self.id,
290               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
291
292     return "<%s at %#x>" % (" ".join(status), id(self))
293
294   @classmethod
295   def Restore(cls, queue, state, writable, archived):
296     """Restore a _QueuedJob from serialized state:
297
298     @type queue: L{JobQueue}
299     @param queue: to which queue the restored job belongs
300     @type state: dict
301     @param state: the serialized state
302     @type writable: bool
303     @param writable: Whether job can be modified
304     @type archived: bool
305     @param archived: Whether job was already archived
306     @rtype: _JobQueue
307     @return: the restored _JobQueue instance
308
309     """
310     obj = _QueuedJob.__new__(cls)
311     obj.queue = queue
312     obj.id = int(state["id"])
313     obj.received_timestamp = state.get("received_timestamp", None)
314     obj.start_timestamp = state.get("start_timestamp", None)
315     obj.end_timestamp = state.get("end_timestamp", None)
316     obj.archived = archived
317
318     obj.ops = []
319     obj.log_serial = 0
320     for op_state in state["ops"]:
321       op = _QueuedOpCode.Restore(op_state)
322       for log_entry in op.log:
323         obj.log_serial = max(obj.log_serial, log_entry[0])
324       obj.ops.append(op)
325
326     cls._InitInMemory(obj, writable)
327
328     return obj
329
330   def Serialize(self):
331     """Serialize the _JobQueue instance.
332
333     @rtype: dict
334     @return: the serialized state
335
336     """
337     return {
338       "id": self.id,
339       "ops": [op.Serialize() for op in self.ops],
340       "start_timestamp": self.start_timestamp,
341       "end_timestamp": self.end_timestamp,
342       "received_timestamp": self.received_timestamp,
343       }
344
345   def CalcStatus(self):
346     """Compute the status of this job.
347
348     This function iterates over all the _QueuedOpCodes in the job and
349     based on their status, computes the job status.
350
351     The algorithm is:
352       - if we find a cancelled, or finished with error, the job
353         status will be the same
354       - otherwise, the last opcode with the status one of:
355           - waitlock
356           - canceling
357           - running
358
359         will determine the job status
360
361       - otherwise, it means either all opcodes are queued, or success,
362         and the job status will be the same
363
364     @return: the job status
365
366     """
367     status = constants.JOB_STATUS_QUEUED
368
369     all_success = True
370     for op in self.ops:
371       if op.status == constants.OP_STATUS_SUCCESS:
372         continue
373
374       all_success = False
375
376       if op.status == constants.OP_STATUS_QUEUED:
377         pass
378       elif op.status == constants.OP_STATUS_WAITING:
379         status = constants.JOB_STATUS_WAITING
380       elif op.status == constants.OP_STATUS_RUNNING:
381         status = constants.JOB_STATUS_RUNNING
382       elif op.status == constants.OP_STATUS_CANCELING:
383         status = constants.JOB_STATUS_CANCELING
384         break
385       elif op.status == constants.OP_STATUS_ERROR:
386         status = constants.JOB_STATUS_ERROR
387         # The whole job fails if one opcode failed
388         break
389       elif op.status == constants.OP_STATUS_CANCELED:
390         status = constants.OP_STATUS_CANCELED
391         break
392
393     if all_success:
394       status = constants.JOB_STATUS_SUCCESS
395
396     return status
397
398   def CalcPriority(self):
399     """Gets the current priority for this job.
400
401     Only unfinished opcodes are considered. When all are done, the default
402     priority is used.
403
404     @rtype: int
405
406     """
407     priorities = [op.priority for op in self.ops
408                   if op.status not in constants.OPS_FINALIZED]
409
410     if not priorities:
411       # All opcodes are done, assume default priority
412       return constants.OP_PRIO_DEFAULT
413
414     return min(priorities)
415
416   def GetLogEntries(self, newer_than):
417     """Selectively returns the log entries.
418
419     @type newer_than: None or int
420     @param newer_than: if this is None, return all log entries,
421         otherwise return only the log entries with serial higher
422         than this value
423     @rtype: list
424     @return: the list of the log entries selected
425
426     """
427     if newer_than is None:
428       serial = -1
429     else:
430       serial = newer_than
431
432     entries = []
433     for op in self.ops:
434       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
435
436     return entries
437
438   def GetInfo(self, fields):
439     """Returns information about a job.
440
441     @type fields: list
442     @param fields: names of fields to return
443     @rtype: list
444     @return: list with one element for each field
445     @raise errors.OpExecError: when an invalid field
446         has been passed
447
448     """
449     return _SimpleJobQuery(fields)(self)
450
451   def MarkUnfinishedOps(self, status, result):
452     """Mark unfinished opcodes with a given status and result.
453
454     This is an utility function for marking all running or waiting to
455     be run opcodes with a given status. Opcodes which are already
456     finalised are not changed.
457
458     @param status: a given opcode status
459     @param result: the opcode result
460
461     """
462     not_marked = True
463     for op in self.ops:
464       if op.status in constants.OPS_FINALIZED:
465         assert not_marked, "Finalized opcodes found after non-finalized ones"
466         continue
467       op.status = status
468       op.result = result
469       not_marked = False
470
471   def Finalize(self):
472     """Marks the job as finalized.
473
474     """
475     self.end_timestamp = TimeStampNow()
476
477   def Cancel(self):
478     """Marks job as canceled/-ing if possible.
479
480     @rtype: tuple; (bool, string)
481     @return: Boolean describing whether job was successfully canceled or marked
482       as canceling and a text message
483
484     """
485     status = self.CalcStatus()
486
487     if status == constants.JOB_STATUS_QUEUED:
488       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
489                              "Job canceled by request")
490       self.Finalize()
491       return (True, "Job %s canceled" % self.id)
492
493     elif status == constants.JOB_STATUS_WAITING:
494       # The worker will notice the new status and cancel the job
495       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
496       return (True, "Job %s will be canceled" % self.id)
497
498     else:
499       logging.debug("Job %s is no longer waiting in the queue", self.id)
500       return (False, "Job %s is no longer waiting in the queue" % self.id)
501
502   def ChangePriority(self, priority):
503     """Changes the job priority.
504
505     @type priority: int
506     @param priority: New priority
507     @rtype: tuple; (bool, string)
508     @return: Boolean describing whether job's priority was successfully changed
509       and a text message
510
511     """
512     status = self.CalcStatus()
513
514     if status in constants.JOBS_FINALIZED:
515       return (False, "Job %s is finished" % self.id)
516     elif status == constants.JOB_STATUS_CANCELING:
517       return (False, "Job %s is cancelling" % self.id)
518     else:
519       assert status in (constants.JOB_STATUS_QUEUED,
520                         constants.JOB_STATUS_WAITING,
521                         constants.JOB_STATUS_RUNNING)
522
523       changed = False
524       for op in self.ops:
525         if (op.status == constants.OP_STATUS_RUNNING or
526             op.status in constants.OPS_FINALIZED):
527           assert not changed, \
528             ("Found opcode for which priority should not be changed after"
529              " priority has been changed for previous opcodes")
530           continue
531
532         assert op.status in (constants.OP_STATUS_QUEUED,
533                              constants.OP_STATUS_WAITING)
534
535         changed = True
536
537         # Set new priority (doesn't modify opcode input)
538         op.priority = priority
539
540       if changed:
541         return (True, ("Priorities of pending opcodes for job %s have been"
542                        " changed to %s" % (self.id, priority)))
543       else:
544         return (False, "Job %s had no pending opcodes" % self.id)
545
546
547 class _OpExecCallbacks(mcpu.OpExecCbBase):
548   def __init__(self, queue, job, op):
549     """Initializes this class.
550
551     @type queue: L{JobQueue}
552     @param queue: Job queue
553     @type job: L{_QueuedJob}
554     @param job: Job object
555     @type op: L{_QueuedOpCode}
556     @param op: OpCode
557
558     """
559     assert queue, "Queue is missing"
560     assert job, "Job is missing"
561     assert op, "Opcode is missing"
562
563     self._queue = queue
564     self._job = job
565     self._op = op
566
567   def _CheckCancel(self):
568     """Raises an exception to cancel the job if asked to.
569
570     """
571     # Cancel here if we were asked to
572     if self._op.status == constants.OP_STATUS_CANCELING:
573       logging.debug("Canceling opcode")
574       raise CancelJob()
575
576     # See if queue is shutting down
577     if not self._queue.AcceptingJobsUnlocked():
578       logging.debug("Queue is shutting down")
579       raise QueueShutdown()
580
581   @locking.ssynchronized(_QUEUE, shared=1)
582   def NotifyStart(self):
583     """Mark the opcode as running, not lock-waiting.
584
585     This is called from the mcpu code as a notifier function, when the LU is
586     finally about to start the Exec() method. Of course, to have end-user
587     visible results, the opcode must be initially (before calling into
588     Processor.ExecOpCode) set to OP_STATUS_WAITING.
589
590     """
591     assert self._op in self._job.ops
592     assert self._op.status in (constants.OP_STATUS_WAITING,
593                                constants.OP_STATUS_CANCELING)
594
595     # Cancel here if we were asked to
596     self._CheckCancel()
597
598     logging.debug("Opcode is now running")
599
600     self._op.status = constants.OP_STATUS_RUNNING
601     self._op.exec_timestamp = TimeStampNow()
602
603     # And finally replicate the job status
604     self._queue.UpdateJobUnlocked(self._job)
605
606   @locking.ssynchronized(_QUEUE, shared=1)
607   def _AppendFeedback(self, timestamp, log_type, log_msg):
608     """Internal feedback append function, with locks
609
610     """
611     self._job.log_serial += 1
612     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
613     self._queue.UpdateJobUnlocked(self._job, replicate=False)
614
615   def Feedback(self, *args):
616     """Append a log entry.
617
618     """
619     assert len(args) < 3
620
621     if len(args) == 1:
622       log_type = constants.ELOG_MESSAGE
623       log_msg = args[0]
624     else:
625       (log_type, log_msg) = args
626
627     # The time is split to make serialization easier and not lose
628     # precision.
629     timestamp = utils.SplitTime(time.time())
630     self._AppendFeedback(timestamp, log_type, log_msg)
631
632   def CurrentPriority(self):
633     """Returns current priority for opcode.
634
635     """
636     assert self._op.status in (constants.OP_STATUS_WAITING,
637                                constants.OP_STATUS_CANCELING)
638
639     # Cancel here if we were asked to
640     self._CheckCancel()
641
642     return self._op.priority
643
644   def SubmitManyJobs(self, jobs):
645     """Submits jobs for processing.
646
647     See L{JobQueue.SubmitManyJobs}.
648
649     """
650     # Locking is done in job queue
651     return self._queue.SubmitManyJobs(jobs)
652
653
654 class _JobChangesChecker(object):
655   def __init__(self, fields, prev_job_info, prev_log_serial):
656     """Initializes this class.
657
658     @type fields: list of strings
659     @param fields: Fields requested by LUXI client
660     @type prev_job_info: string
661     @param prev_job_info: previous job info, as passed by the LUXI client
662     @type prev_log_serial: string
663     @param prev_log_serial: previous job serial, as passed by the LUXI client
664
665     """
666     self._squery = _SimpleJobQuery(fields)
667     self._prev_job_info = prev_job_info
668     self._prev_log_serial = prev_log_serial
669
670   def __call__(self, job):
671     """Checks whether job has changed.
672
673     @type job: L{_QueuedJob}
674     @param job: Job object
675
676     """
677     assert not job.writable, "Expected read-only job"
678
679     status = job.CalcStatus()
680     job_info = self._squery(job)
681     log_entries = job.GetLogEntries(self._prev_log_serial)
682
683     # Serializing and deserializing data can cause type changes (e.g. from
684     # tuple to list) or precision loss. We're doing it here so that we get
685     # the same modifications as the data received from the client. Without
686     # this, the comparison afterwards might fail without the data being
687     # significantly different.
688     # TODO: we just deserialized from disk, investigate how to make sure that
689     # the job info and log entries are compatible to avoid this further step.
690     # TODO: Doing something like in testutils.py:UnifyValueType might be more
691     # efficient, though floats will be tricky
692     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
693     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
694
695     # Don't even try to wait if the job is no longer running, there will be
696     # no changes.
697     if (status not in (constants.JOB_STATUS_QUEUED,
698                        constants.JOB_STATUS_RUNNING,
699                        constants.JOB_STATUS_WAITING) or
700         job_info != self._prev_job_info or
701         (log_entries and self._prev_log_serial != log_entries[0][0])):
702       logging.debug("Job %s changed", job.id)
703       return (job_info, log_entries)
704
705     return None
706
707
708 class _JobFileChangesWaiter(object):
709   def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
710     """Initializes this class.
711
712     @type filename: string
713     @param filename: Path to job file
714     @raises errors.InotifyError: if the notifier cannot be setup
715
716     """
717     self._wm = _inotify_wm_cls()
718     self._inotify_handler = \
719       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
720     self._notifier = \
721       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
722     try:
723       self._inotify_handler.enable()
724     except Exception:
725       # pyinotify doesn't close file descriptors automatically
726       self._notifier.stop()
727       raise
728
729   def _OnInotify(self, notifier_enabled):
730     """Callback for inotify.
731
732     """
733     if not notifier_enabled:
734       self._inotify_handler.enable()
735
736   def Wait(self, timeout):
737     """Waits for the job file to change.
738
739     @type timeout: float
740     @param timeout: Timeout in seconds
741     @return: Whether there have been events
742
743     """
744     assert timeout >= 0
745     have_events = self._notifier.check_events(timeout * 1000)
746     if have_events:
747       self._notifier.read_events()
748     self._notifier.process_events()
749     return have_events
750
751   def Close(self):
752     """Closes underlying notifier and its file descriptor.
753
754     """
755     self._notifier.stop()
756
757
758 class _JobChangesWaiter(object):
759   def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
760     """Initializes this class.
761
762     @type filename: string
763     @param filename: Path to job file
764
765     """
766     self._filewaiter = None
767     self._filename = filename
768     self._waiter_cls = _waiter_cls
769
770   def Wait(self, timeout):
771     """Waits for a job to change.
772
773     @type timeout: float
774     @param timeout: Timeout in seconds
775     @return: Whether there have been events
776
777     """
778     if self._filewaiter:
779       return self._filewaiter.Wait(timeout)
780
781     # Lazy setup: Avoid inotify setup cost when job file has already changed.
782     # If this point is reached, return immediately and let caller check the job
783     # file again in case there were changes since the last check. This avoids a
784     # race condition.
785     self._filewaiter = self._waiter_cls(self._filename)
786
787     return True
788
789   def Close(self):
790     """Closes underlying waiter.
791
792     """
793     if self._filewaiter:
794       self._filewaiter.Close()
795
796
797 class _WaitForJobChangesHelper(object):
798   """Helper class using inotify to wait for changes in a job file.
799
800   This class takes a previous job status and serial, and alerts the client when
801   the current job status has changed.
802
803   """
804   @staticmethod
805   def _CheckForChanges(counter, job_load_fn, check_fn):
806     if counter.next() > 0:
807       # If this isn't the first check the job is given some more time to change
808       # again. This gives better performance for jobs generating many
809       # changes/messages.
810       time.sleep(0.1)
811
812     job = job_load_fn()
813     if not job:
814       raise errors.JobLost()
815
816     result = check_fn(job)
817     if result is None:
818       raise utils.RetryAgain()
819
820     return result
821
822   def __call__(self, filename, job_load_fn,
823                fields, prev_job_info, prev_log_serial, timeout,
824                _waiter_cls=_JobChangesWaiter):
825     """Waits for changes on a job.
826
827     @type filename: string
828     @param filename: File on which to wait for changes
829     @type job_load_fn: callable
830     @param job_load_fn: Function to load job
831     @type fields: list of strings
832     @param fields: Which fields to check for changes
833     @type prev_job_info: list or None
834     @param prev_job_info: Last job information returned
835     @type prev_log_serial: int
836     @param prev_log_serial: Last job message serial number
837     @type timeout: float
838     @param timeout: maximum time to wait in seconds
839
840     """
841     counter = itertools.count()
842     try:
843       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
844       waiter = _waiter_cls(filename)
845       try:
846         return utils.Retry(compat.partial(self._CheckForChanges,
847                                           counter, job_load_fn, check_fn),
848                            utils.RETRY_REMAINING_TIME, timeout,
849                            wait_fn=waiter.Wait)
850       finally:
851         waiter.Close()
852     except errors.JobLost:
853       return None
854     except utils.RetryTimeout:
855       return constants.JOB_NOTCHANGED
856
857
858 def _EncodeOpError(err):
859   """Encodes an error which occurred while processing an opcode.
860
861   """
862   if isinstance(err, errors.GenericError):
863     to_encode = err
864   else:
865     to_encode = errors.OpExecError(str(err))
866
867   return errors.EncodeException(to_encode)
868
869
870 class _TimeoutStrategyWrapper:
871   def __init__(self, fn):
872     """Initializes this class.
873
874     """
875     self._fn = fn
876     self._next = None
877
878   def _Advance(self):
879     """Gets the next timeout if necessary.
880
881     """
882     if self._next is None:
883       self._next = self._fn()
884
885   def Peek(self):
886     """Returns the next timeout.
887
888     """
889     self._Advance()
890     return self._next
891
892   def Next(self):
893     """Returns the current timeout and advances the internal state.
894
895     """
896     self._Advance()
897     result = self._next
898     self._next = None
899     return result
900
901
902 class _OpExecContext:
903   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
904     """Initializes this class.
905
906     """
907     self.op = op
908     self.index = index
909     self.log_prefix = log_prefix
910     self.summary = op.input.Summary()
911
912     # Create local copy to modify
913     if getattr(op.input, opcodes.DEPEND_ATTR, None):
914       self.jobdeps = op.input.depends[:]
915     else:
916       self.jobdeps = None
917
918     self._timeout_strategy_factory = timeout_strategy_factory
919     self._ResetTimeoutStrategy()
920
921   def _ResetTimeoutStrategy(self):
922     """Creates a new timeout strategy.
923
924     """
925     self._timeout_strategy = \
926       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
927
928   def CheckPriorityIncrease(self):
929     """Checks whether priority can and should be increased.
930
931     Called when locks couldn't be acquired.
932
933     """
934     op = self.op
935
936     # Exhausted all retries and next round should not use blocking acquire
937     # for locks?
938     if (self._timeout_strategy.Peek() is None and
939         op.priority > constants.OP_PRIO_HIGHEST):
940       logging.debug("Increasing priority")
941       op.priority -= 1
942       self._ResetTimeoutStrategy()
943       return True
944
945     return False
946
947   def GetNextLockTimeout(self):
948     """Returns the next lock acquire timeout.
949
950     """
951     return self._timeout_strategy.Next()
952
953
954 class _JobProcessor(object):
955   (DEFER,
956    WAITDEP,
957    FINISHED) = range(1, 4)
958
959   def __init__(self, queue, opexec_fn, job,
960                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
961     """Initializes this class.
962
963     """
964     self.queue = queue
965     self.opexec_fn = opexec_fn
966     self.job = job
967     self._timeout_strategy_factory = _timeout_strategy_factory
968
969   @staticmethod
970   def _FindNextOpcode(job, timeout_strategy_factory):
971     """Locates the next opcode to run.
972
973     @type job: L{_QueuedJob}
974     @param job: Job object
975     @param timeout_strategy_factory: Callable to create new timeout strategy
976
977     """
978     # Create some sort of a cache to speed up locating next opcode for future
979     # lookups
980     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
981     # pending and one for processed ops.
982     if job.ops_iter is None:
983       job.ops_iter = enumerate(job.ops)
984
985     # Find next opcode to run
986     while True:
987       try:
988         (idx, op) = job.ops_iter.next()
989       except StopIteration:
990         raise errors.ProgrammerError("Called for a finished job")
991
992       if op.status == constants.OP_STATUS_RUNNING:
993         # Found an opcode already marked as running
994         raise errors.ProgrammerError("Called for job marked as running")
995
996       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
997                              timeout_strategy_factory)
998
999       if op.status not in constants.OPS_FINALIZED:
1000         return opctx
1001
1002       # This is a job that was partially completed before master daemon
1003       # shutdown, so it can be expected that some opcodes are already
1004       # completed successfully (if any did error out, then the whole job
1005       # should have been aborted and not resubmitted for processing).
1006       logging.info("%s: opcode %s already processed, skipping",
1007                    opctx.log_prefix, opctx.summary)
1008
1009   @staticmethod
1010   def _MarkWaitlock(job, op):
1011     """Marks an opcode as waiting for locks.
1012
1013     The job's start timestamp is also set if necessary.
1014
1015     @type job: L{_QueuedJob}
1016     @param job: Job object
1017     @type op: L{_QueuedOpCode}
1018     @param op: Opcode object
1019
1020     """
1021     assert op in job.ops
1022     assert op.status in (constants.OP_STATUS_QUEUED,
1023                          constants.OP_STATUS_WAITING)
1024
1025     update = False
1026
1027     op.result = None
1028
1029     if op.status == constants.OP_STATUS_QUEUED:
1030       op.status = constants.OP_STATUS_WAITING
1031       update = True
1032
1033     if op.start_timestamp is None:
1034       op.start_timestamp = TimeStampNow()
1035       update = True
1036
1037     if job.start_timestamp is None:
1038       job.start_timestamp = op.start_timestamp
1039       update = True
1040
1041     assert op.status == constants.OP_STATUS_WAITING
1042
1043     return update
1044
1045   @staticmethod
1046   def _CheckDependencies(queue, job, opctx):
1047     """Checks if an opcode has dependencies and if so, processes them.
1048
1049     @type queue: L{JobQueue}
1050     @param queue: Queue object
1051     @type job: L{_QueuedJob}
1052     @param job: Job object
1053     @type opctx: L{_OpExecContext}
1054     @param opctx: Opcode execution context
1055     @rtype: bool
1056     @return: Whether opcode will be re-scheduled by dependency tracker
1057
1058     """
1059     op = opctx.op
1060
1061     result = False
1062
1063     while opctx.jobdeps:
1064       (dep_job_id, dep_status) = opctx.jobdeps[0]
1065
1066       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1067                                                           dep_status)
1068       assert ht.TNonEmptyString(depmsg), "No dependency message"
1069
1070       logging.info("%s: %s", opctx.log_prefix, depmsg)
1071
1072       if depresult == _JobDependencyManager.CONTINUE:
1073         # Remove dependency and continue
1074         opctx.jobdeps.pop(0)
1075
1076       elif depresult == _JobDependencyManager.WAIT:
1077         # Need to wait for notification, dependency tracker will re-add job
1078         # to workerpool
1079         result = True
1080         break
1081
1082       elif depresult == _JobDependencyManager.CANCEL:
1083         # Job was cancelled, cancel this job as well
1084         job.Cancel()
1085         assert op.status == constants.OP_STATUS_CANCELING
1086         break
1087
1088       elif depresult in (_JobDependencyManager.WRONGSTATUS,
1089                          _JobDependencyManager.ERROR):
1090         # Job failed or there was an error, this job must fail
1091         op.status = constants.OP_STATUS_ERROR
1092         op.result = _EncodeOpError(errors.OpExecError(depmsg))
1093         break
1094
1095       else:
1096         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1097                                      depresult)
1098
1099     return result
1100
1101   def _ExecOpCodeUnlocked(self, opctx):
1102     """Processes one opcode and returns the result.
1103
1104     """
1105     op = opctx.op
1106
1107     assert op.status == constants.OP_STATUS_WAITING
1108
1109     timeout = opctx.GetNextLockTimeout()
1110
1111     try:
1112       # Make sure not to hold queue lock while calling ExecOpCode
1113       result = self.opexec_fn(op.input,
1114                               _OpExecCallbacks(self.queue, self.job, op),
1115                               timeout=timeout)
1116     except mcpu.LockAcquireTimeout:
1117       assert timeout is not None, "Received timeout for blocking acquire"
1118       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1119
1120       assert op.status in (constants.OP_STATUS_WAITING,
1121                            constants.OP_STATUS_CANCELING)
1122
1123       # Was job cancelled while we were waiting for the lock?
1124       if op.status == constants.OP_STATUS_CANCELING:
1125         return (constants.OP_STATUS_CANCELING, None)
1126
1127       # Queue is shutting down, return to queued
1128       if not self.queue.AcceptingJobsUnlocked():
1129         return (constants.OP_STATUS_QUEUED, None)
1130
1131       # Stay in waitlock while trying to re-acquire lock
1132       return (constants.OP_STATUS_WAITING, None)
1133     except CancelJob:
1134       logging.exception("%s: Canceling job", opctx.log_prefix)
1135       assert op.status == constants.OP_STATUS_CANCELING
1136       return (constants.OP_STATUS_CANCELING, None)
1137
1138     except QueueShutdown:
1139       logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1140
1141       assert op.status == constants.OP_STATUS_WAITING
1142
1143       # Job hadn't been started yet, so it should return to the queue
1144       return (constants.OP_STATUS_QUEUED, None)
1145
1146     except Exception, err: # pylint: disable=W0703
1147       logging.exception("%s: Caught exception in %s",
1148                         opctx.log_prefix, opctx.summary)
1149       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1150     else:
1151       logging.debug("%s: %s successful",
1152                     opctx.log_prefix, opctx.summary)
1153       return (constants.OP_STATUS_SUCCESS, result)
1154
1155   def __call__(self, _nextop_fn=None):
1156     """Continues execution of a job.
1157
1158     @param _nextop_fn: Callback function for tests
1159     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1160       be deferred and C{WAITDEP} if the dependency manager
1161       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1162
1163     """
1164     queue = self.queue
1165     job = self.job
1166
1167     logging.debug("Processing job %s", job.id)
1168
1169     queue.acquire(shared=1)
1170     try:
1171       opcount = len(job.ops)
1172
1173       assert job.writable, "Expected writable job"
1174
1175       # Don't do anything for finalized jobs
1176       if job.CalcStatus() in constants.JOBS_FINALIZED:
1177         return self.FINISHED
1178
1179       # Is a previous opcode still pending?
1180       if job.cur_opctx:
1181         opctx = job.cur_opctx
1182         job.cur_opctx = None
1183       else:
1184         if __debug__ and _nextop_fn:
1185           _nextop_fn()
1186         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1187
1188       op = opctx.op
1189
1190       # Consistency check
1191       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1192                                      constants.OP_STATUS_CANCELING)
1193                         for i in job.ops[opctx.index + 1:])
1194
1195       assert op.status in (constants.OP_STATUS_QUEUED,
1196                            constants.OP_STATUS_WAITING,
1197                            constants.OP_STATUS_CANCELING)
1198
1199       assert (op.priority <= constants.OP_PRIO_LOWEST and
1200               op.priority >= constants.OP_PRIO_HIGHEST)
1201
1202       waitjob = None
1203
1204       if op.status != constants.OP_STATUS_CANCELING:
1205         assert op.status in (constants.OP_STATUS_QUEUED,
1206                              constants.OP_STATUS_WAITING)
1207
1208         # Prepare to start opcode
1209         if self._MarkWaitlock(job, op):
1210           # Write to disk
1211           queue.UpdateJobUnlocked(job)
1212
1213         assert op.status == constants.OP_STATUS_WAITING
1214         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1215         assert job.start_timestamp and op.start_timestamp
1216         assert waitjob is None
1217
1218         # Check if waiting for a job is necessary
1219         waitjob = self._CheckDependencies(queue, job, opctx)
1220
1221         assert op.status in (constants.OP_STATUS_WAITING,
1222                              constants.OP_STATUS_CANCELING,
1223                              constants.OP_STATUS_ERROR)
1224
1225         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1226                                          constants.OP_STATUS_ERROR)):
1227           logging.info("%s: opcode %s waiting for locks",
1228                        opctx.log_prefix, opctx.summary)
1229
1230           assert not opctx.jobdeps, "Not all dependencies were removed"
1231
1232           queue.release()
1233           try:
1234             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1235           finally:
1236             queue.acquire(shared=1)
1237
1238           op.status = op_status
1239           op.result = op_result
1240
1241           assert not waitjob
1242
1243         if op.status in (constants.OP_STATUS_WAITING,
1244                          constants.OP_STATUS_QUEUED):
1245           # waiting: Couldn't get locks in time
1246           # queued: Queue is shutting down
1247           assert not op.end_timestamp
1248         else:
1249           # Finalize opcode
1250           op.end_timestamp = TimeStampNow()
1251
1252           if op.status == constants.OP_STATUS_CANCELING:
1253             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1254                                   for i in job.ops[opctx.index:])
1255           else:
1256             assert op.status in constants.OPS_FINALIZED
1257
1258       if op.status == constants.OP_STATUS_QUEUED:
1259         # Queue is shutting down
1260         assert not waitjob
1261
1262         finalize = False
1263
1264         # Reset context
1265         job.cur_opctx = None
1266
1267         # In no case must the status be finalized here
1268         assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1269
1270       elif op.status == constants.OP_STATUS_WAITING or waitjob:
1271         finalize = False
1272
1273         if not waitjob and opctx.CheckPriorityIncrease():
1274           # Priority was changed, need to update on-disk file
1275           queue.UpdateJobUnlocked(job)
1276
1277         # Keep around for another round
1278         job.cur_opctx = opctx
1279
1280         assert (op.priority <= constants.OP_PRIO_LOWEST and
1281                 op.priority >= constants.OP_PRIO_HIGHEST)
1282
1283         # In no case must the status be finalized here
1284         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1285
1286       else:
1287         # Ensure all opcodes so far have been successful
1288         assert (opctx.index == 0 or
1289                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1290                            for i in job.ops[:opctx.index]))
1291
1292         # Reset context
1293         job.cur_opctx = None
1294
1295         if op.status == constants.OP_STATUS_SUCCESS:
1296           finalize = False
1297
1298         elif op.status == constants.OP_STATUS_ERROR:
1299           # Ensure failed opcode has an exception as its result
1300           assert errors.GetEncodedError(job.ops[opctx.index].result)
1301
1302           to_encode = errors.OpExecError("Preceding opcode failed")
1303           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1304                                 _EncodeOpError(to_encode))
1305           finalize = True
1306
1307           # Consistency check
1308           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1309                             errors.GetEncodedError(i.result)
1310                             for i in job.ops[opctx.index:])
1311
1312         elif op.status == constants.OP_STATUS_CANCELING:
1313           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1314                                 "Job canceled by request")
1315           finalize = True
1316
1317         else:
1318           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1319
1320         if opctx.index == (opcount - 1):
1321           # Finalize on last opcode
1322           finalize = True
1323
1324         if finalize:
1325           # All opcodes have been run, finalize job
1326           job.Finalize()
1327
1328         # Write to disk. If the job status is final, this is the final write
1329         # allowed. Once the file has been written, it can be archived anytime.
1330         queue.UpdateJobUnlocked(job)
1331
1332         assert not waitjob
1333
1334         if finalize:
1335           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1336           return self.FINISHED
1337
1338       assert not waitjob or queue.depmgr.JobWaiting(job)
1339
1340       if waitjob:
1341         return self.WAITDEP
1342       else:
1343         return self.DEFER
1344     finally:
1345       assert job.writable, "Job became read-only while being processed"
1346       queue.release()
1347
1348
1349 def _EvaluateJobProcessorResult(depmgr, job, result):
1350   """Looks at a result from L{_JobProcessor} for a job.
1351
1352   To be used in a L{_JobQueueWorker}.
1353
1354   """
1355   if result == _JobProcessor.FINISHED:
1356     # Notify waiting jobs
1357     depmgr.NotifyWaiters(job.id)
1358
1359   elif result == _JobProcessor.DEFER:
1360     # Schedule again
1361     raise workerpool.DeferTask(priority=job.CalcPriority())
1362
1363   elif result == _JobProcessor.WAITDEP:
1364     # No-op, dependency manager will re-schedule
1365     pass
1366
1367   else:
1368     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1369                                  (result, ))
1370
1371
1372 class _JobQueueWorker(workerpool.BaseWorker):
1373   """The actual job workers.
1374
1375   """
1376   def RunTask(self, job): # pylint: disable=W0221
1377     """Job executor.
1378
1379     @type job: L{_QueuedJob}
1380     @param job: the job to be processed
1381
1382     """
1383     assert job.writable, "Expected writable job"
1384
1385     # Ensure only one worker is active on a single job. If a job registers for
1386     # a dependency job, and the other job notifies before the first worker is
1387     # done, the job can end up in the tasklist more than once.
1388     job.processor_lock.acquire()
1389     try:
1390       return self._RunTaskInner(job)
1391     finally:
1392       job.processor_lock.release()
1393
1394   def _RunTaskInner(self, job):
1395     """Executes a job.
1396
1397     Must be called with per-job lock acquired.
1398
1399     """
1400     queue = job.queue
1401     assert queue == self.pool.queue
1402
1403     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1404     setname_fn(None)
1405
1406     proc = mcpu.Processor(queue.context, job.id)
1407
1408     # Create wrapper for setting thread name
1409     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1410                                     proc.ExecOpCode)
1411
1412     _EvaluateJobProcessorResult(queue.depmgr, job,
1413                                 _JobProcessor(queue, wrap_execop_fn, job)())
1414
1415   @staticmethod
1416   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1417     """Updates the worker thread name to include a short summary of the opcode.
1418
1419     @param setname_fn: Callable setting worker thread name
1420     @param execop_fn: Callable for executing opcode (usually
1421                       L{mcpu.Processor.ExecOpCode})
1422
1423     """
1424     setname_fn(op)
1425     try:
1426       return execop_fn(op, *args, **kwargs)
1427     finally:
1428       setname_fn(None)
1429
1430   @staticmethod
1431   def _GetWorkerName(job, op):
1432     """Sets the worker thread name.
1433
1434     @type job: L{_QueuedJob}
1435     @type op: L{opcodes.OpCode}
1436
1437     """
1438     parts = ["Job%s" % job.id]
1439
1440     if op:
1441       parts.append(op.TinySummary())
1442
1443     return "/".join(parts)
1444
1445
1446 class _JobQueueWorkerPool(workerpool.WorkerPool):
1447   """Simple class implementing a job-processing workerpool.
1448
1449   """
1450   def __init__(self, queue):
1451     super(_JobQueueWorkerPool, self).__init__("Jq",
1452                                               JOBQUEUE_THREADS,
1453                                               _JobQueueWorker)
1454     self.queue = queue
1455
1456
1457 class _JobDependencyManager:
1458   """Keeps track of job dependencies.
1459
1460   """
1461   (WAIT,
1462    ERROR,
1463    CANCEL,
1464    CONTINUE,
1465    WRONGSTATUS) = range(1, 6)
1466
1467   def __init__(self, getstatus_fn, enqueue_fn):
1468     """Initializes this class.
1469
1470     """
1471     self._getstatus_fn = getstatus_fn
1472     self._enqueue_fn = enqueue_fn
1473
1474     self._waiters = {}
1475     self._lock = locking.SharedLock("JobDepMgr")
1476
1477   @locking.ssynchronized(_LOCK, shared=1)
1478   def GetLockInfo(self, requested): # pylint: disable=W0613
1479     """Retrieves information about waiting jobs.
1480
1481     @type requested: set
1482     @param requested: Requested information, see C{query.LQ_*}
1483
1484     """
1485     # No need to sort here, that's being done by the lock manager and query
1486     # library. There are no priorities for notifying jobs, hence all show up as
1487     # one item under "pending".
1488     return [("job/%s" % job_id, None, None,
1489              [("job", [job.id for job in waiters])])
1490             for job_id, waiters in self._waiters.items()
1491             if waiters]
1492
1493   @locking.ssynchronized(_LOCK, shared=1)
1494   def JobWaiting(self, job):
1495     """Checks if a job is waiting.
1496
1497     """
1498     return compat.any(job in jobs
1499                       for jobs in self._waiters.values())
1500
1501   @locking.ssynchronized(_LOCK)
1502   def CheckAndRegister(self, job, dep_job_id, dep_status):
1503     """Checks if a dependency job has the requested status.
1504
1505     If the other job is not yet in a finalized status, the calling job will be
1506     notified (re-added to the workerpool) at a later point.
1507
1508     @type job: L{_QueuedJob}
1509     @param job: Job object
1510     @type dep_job_id: int
1511     @param dep_job_id: ID of dependency job
1512     @type dep_status: list
1513     @param dep_status: Required status
1514
1515     """
1516     assert ht.TJobId(job.id)
1517     assert ht.TJobId(dep_job_id)
1518     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1519
1520     if job.id == dep_job_id:
1521       return (self.ERROR, "Job can't depend on itself")
1522
1523     # Get status of dependency job
1524     try:
1525       status = self._getstatus_fn(dep_job_id)
1526     except errors.JobLost, err:
1527       return (self.ERROR, "Dependency error: %s" % err)
1528
1529     assert status in constants.JOB_STATUS_ALL
1530
1531     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1532
1533     if status not in constants.JOBS_FINALIZED:
1534       # Register for notification and wait for job to finish
1535       job_id_waiters.add(job)
1536       return (self.WAIT,
1537               "Need to wait for job %s, wanted status '%s'" %
1538               (dep_job_id, dep_status))
1539
1540     # Remove from waiters list
1541     if job in job_id_waiters:
1542       job_id_waiters.remove(job)
1543
1544     if (status == constants.JOB_STATUS_CANCELED and
1545         constants.JOB_STATUS_CANCELED not in dep_status):
1546       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1547
1548     elif not dep_status or status in dep_status:
1549       return (self.CONTINUE,
1550               "Dependency job %s finished with status '%s'" %
1551               (dep_job_id, status))
1552
1553     else:
1554       return (self.WRONGSTATUS,
1555               "Dependency job %s finished with status '%s',"
1556               " not one of '%s' as required" %
1557               (dep_job_id, status, utils.CommaJoin(dep_status)))
1558
1559   def _RemoveEmptyWaitersUnlocked(self):
1560     """Remove all jobs without actual waiters.
1561
1562     """
1563     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1564                    if not waiters]:
1565       del self._waiters[job_id]
1566
1567   def NotifyWaiters(self, job_id):
1568     """Notifies all jobs waiting for a certain job ID.
1569
1570     @attention: Do not call until L{CheckAndRegister} returned a status other
1571       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1572     @type job_id: int
1573     @param job_id: Job ID
1574
1575     """
1576     assert ht.TJobId(job_id)
1577
1578     self._lock.acquire()
1579     try:
1580       self._RemoveEmptyWaitersUnlocked()
1581
1582       jobs = self._waiters.pop(job_id, None)
1583     finally:
1584       self._lock.release()
1585
1586     if jobs:
1587       # Re-add jobs to workerpool
1588       logging.debug("Re-adding %s jobs which were waiting for job %s",
1589                     len(jobs), job_id)
1590       self._enqueue_fn(jobs)
1591
1592
1593 def _RequireOpenQueue(fn):
1594   """Decorator for "public" functions.
1595
1596   This function should be used for all 'public' functions. That is,
1597   functions usually called from other classes. Note that this should
1598   be applied only to methods (not plain functions), since it expects
1599   that the decorated function is called with a first argument that has
1600   a '_queue_filelock' argument.
1601
1602   @warning: Use this decorator only after locking.ssynchronized
1603
1604   Example::
1605     @locking.ssynchronized(_LOCK)
1606     @_RequireOpenQueue
1607     def Example(self):
1608       pass
1609
1610   """
1611   def wrapper(self, *args, **kwargs):
1612     # pylint: disable=W0212
1613     assert self._queue_filelock is not None, "Queue should be open"
1614     return fn(self, *args, **kwargs)
1615   return wrapper
1616
1617
1618 def _RequireNonDrainedQueue(fn):
1619   """Decorator checking for a non-drained queue.
1620
1621   To be used with functions submitting new jobs.
1622
1623   """
1624   def wrapper(self, *args, **kwargs):
1625     """Wrapper function.
1626
1627     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1628
1629     """
1630     # Ok when sharing the big job queue lock, as the drain file is created when
1631     # the lock is exclusive.
1632     # Needs access to protected member, pylint: disable=W0212
1633     if self._drained:
1634       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1635
1636     if not self._accepting_jobs:
1637       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1638
1639     return fn(self, *args, **kwargs)
1640   return wrapper
1641
1642
1643 class JobQueue(object):
1644   """Queue used to manage the jobs.
1645
1646   """
1647   def __init__(self, context):
1648     """Constructor for JobQueue.
1649
1650     The constructor will initialize the job queue object and then
1651     start loading the current jobs from disk, either for starting them
1652     (if they were queue) or for aborting them (if they were already
1653     running).
1654
1655     @type context: GanetiContext
1656     @param context: the context object for access to the configuration
1657         data and other ganeti objects
1658
1659     """
1660     self.context = context
1661     self._memcache = weakref.WeakValueDictionary()
1662     self._my_hostname = netutils.Hostname.GetSysName()
1663
1664     # The Big JobQueue lock. If a code block or method acquires it in shared
1665     # mode safe it must guarantee concurrency with all the code acquiring it in
1666     # shared mode, including itself. In order not to acquire it at all
1667     # concurrency must be guaranteed with all code acquiring it in shared mode
1668     # and all code acquiring it exclusively.
1669     self._lock = locking.SharedLock("JobQueue")
1670
1671     self.acquire = self._lock.acquire
1672     self.release = self._lock.release
1673
1674     # Accept jobs by default
1675     self._accepting_jobs = True
1676
1677     # Initialize the queue, and acquire the filelock.
1678     # This ensures no other process is working on the job queue.
1679     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1680
1681     # Read serial file
1682     self._last_serial = jstore.ReadSerial()
1683     assert self._last_serial is not None, ("Serial file was modified between"
1684                                            " check in jstore and here")
1685
1686     # Get initial list of nodes
1687     self._nodes = dict((n.name, n.primary_ip)
1688                        for n in self.context.cfg.GetAllNodesInfo().values()
1689                        if n.master_candidate)
1690
1691     # Remove master node
1692     self._nodes.pop(self._my_hostname, None)
1693
1694     # TODO: Check consistency across nodes
1695
1696     self._queue_size = None
1697     self._UpdateQueueSizeUnlocked()
1698     assert ht.TInt(self._queue_size)
1699     self._drained = jstore.CheckDrainFlag()
1700
1701     # Job dependencies
1702     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1703                                         self._EnqueueJobs)
1704     self.context.glm.AddToLockMonitor(self.depmgr)
1705
1706     # Setup worker pool
1707     self._wpool = _JobQueueWorkerPool(self)
1708     try:
1709       self._InspectQueue()
1710     except:
1711       self._wpool.TerminateWorkers()
1712       raise
1713
1714   @locking.ssynchronized(_LOCK)
1715   @_RequireOpenQueue
1716   def _InspectQueue(self):
1717     """Loads the whole job queue and resumes unfinished jobs.
1718
1719     This function needs the lock here because WorkerPool.AddTask() may start a
1720     job while we're still doing our work.
1721
1722     """
1723     logging.info("Inspecting job queue")
1724
1725     restartjobs = []
1726
1727     all_job_ids = self._GetJobIDsUnlocked()
1728     jobs_count = len(all_job_ids)
1729     lastinfo = time.time()
1730     for idx, job_id in enumerate(all_job_ids):
1731       # Give an update every 1000 jobs or 10 seconds
1732       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1733           idx == (jobs_count - 1)):
1734         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1735                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1736         lastinfo = time.time()
1737
1738       job = self._LoadJobUnlocked(job_id)
1739
1740       # a failure in loading the job can cause 'None' to be returned
1741       if job is None:
1742         continue
1743
1744       status = job.CalcStatus()
1745
1746       if status == constants.JOB_STATUS_QUEUED:
1747         restartjobs.append(job)
1748
1749       elif status in (constants.JOB_STATUS_RUNNING,
1750                       constants.JOB_STATUS_WAITING,
1751                       constants.JOB_STATUS_CANCELING):
1752         logging.warning("Unfinished job %s found: %s", job.id, job)
1753
1754         if status == constants.JOB_STATUS_WAITING:
1755           # Restart job
1756           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1757           restartjobs.append(job)
1758         else:
1759           to_encode = errors.OpExecError("Unclean master daemon shutdown")
1760           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1761                                 _EncodeOpError(to_encode))
1762           job.Finalize()
1763
1764         self.UpdateJobUnlocked(job)
1765
1766     if restartjobs:
1767       logging.info("Restarting %s jobs", len(restartjobs))
1768       self._EnqueueJobsUnlocked(restartjobs)
1769
1770     logging.info("Job queue inspection finished")
1771
1772   def _GetRpc(self, address_list):
1773     """Gets RPC runner with context.
1774
1775     """
1776     return rpc.JobQueueRunner(self.context, address_list)
1777
1778   @locking.ssynchronized(_LOCK)
1779   @_RequireOpenQueue
1780   def AddNode(self, node):
1781     """Register a new node with the queue.
1782
1783     @type node: L{objects.Node}
1784     @param node: the node object to be added
1785
1786     """
1787     node_name = node.name
1788     assert node_name != self._my_hostname
1789
1790     # Clean queue directory on added node
1791     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1792     msg = result.fail_msg
1793     if msg:
1794       logging.warning("Cannot cleanup queue directory on node %s: %s",
1795                       node_name, msg)
1796
1797     if not node.master_candidate:
1798       # remove if existing, ignoring errors
1799       self._nodes.pop(node_name, None)
1800       # and skip the replication of the job ids
1801       return
1802
1803     # Upload the whole queue excluding archived jobs
1804     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1805
1806     # Upload current serial file
1807     files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1808
1809     # Static address list
1810     addrs = [node.primary_ip]
1811
1812     for file_name in files:
1813       # Read file content
1814       content = utils.ReadFile(file_name)
1815
1816       result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1817                              file_name, content)
1818       msg = result[node_name].fail_msg
1819       if msg:
1820         logging.error("Failed to upload file %s to node %s: %s",
1821                       file_name, node_name, msg)
1822
1823     # Set queue drained flag
1824     result = \
1825       self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1826                                                        self._drained)
1827     msg = result[node_name].fail_msg
1828     if msg:
1829       logging.error("Failed to set queue drained flag on node %s: %s",
1830                     node_name, msg)
1831
1832     self._nodes[node_name] = node.primary_ip
1833
1834   @locking.ssynchronized(_LOCK)
1835   @_RequireOpenQueue
1836   def RemoveNode(self, node_name):
1837     """Callback called when removing nodes from the cluster.
1838
1839     @type node_name: str
1840     @param node_name: the name of the node to remove
1841
1842     """
1843     self._nodes.pop(node_name, None)
1844
1845   @staticmethod
1846   def _CheckRpcResult(result, nodes, failmsg):
1847     """Verifies the status of an RPC call.
1848
1849     Since we aim to keep consistency should this node (the current
1850     master) fail, we will log errors if our rpc fail, and especially
1851     log the case when more than half of the nodes fails.
1852
1853     @param result: the data as returned from the rpc call
1854     @type nodes: list
1855     @param nodes: the list of nodes we made the call to
1856     @type failmsg: str
1857     @param failmsg: the identifier to be used for logging
1858
1859     """
1860     failed = []
1861     success = []
1862
1863     for node in nodes:
1864       msg = result[node].fail_msg
1865       if msg:
1866         failed.append(node)
1867         logging.error("RPC call %s (%s) failed on node %s: %s",
1868                       result[node].call, failmsg, node, msg)
1869       else:
1870         success.append(node)
1871
1872     # +1 for the master node
1873     if (len(success) + 1) < len(failed):
1874       # TODO: Handle failing nodes
1875       logging.error("More than half of the nodes failed")
1876
1877   def _GetNodeIp(self):
1878     """Helper for returning the node name/ip list.
1879
1880     @rtype: (list, list)
1881     @return: a tuple of two lists, the first one with the node
1882         names and the second one with the node addresses
1883
1884     """
1885     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1886     name_list = self._nodes.keys()
1887     addr_list = [self._nodes[name] for name in name_list]
1888     return name_list, addr_list
1889
1890   def _UpdateJobQueueFile(self, file_name, data, replicate):
1891     """Writes a file locally and then replicates it to all nodes.
1892
1893     This function will replace the contents of a file on the local
1894     node and then replicate it to all the other nodes we have.
1895
1896     @type file_name: str
1897     @param file_name: the path of the file to be replicated
1898     @type data: str
1899     @param data: the new contents of the file
1900     @type replicate: boolean
1901     @param replicate: whether to spread the changes to the remote nodes
1902
1903     """
1904     getents = runtime.GetEnts()
1905     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1906                     gid=getents.daemons_gid,
1907                     mode=constants.JOB_QUEUE_FILES_PERMS)
1908
1909     if replicate:
1910       names, addrs = self._GetNodeIp()
1911       result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1912       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1913
1914   def _RenameFilesUnlocked(self, rename):
1915     """Renames a file locally and then replicate the change.
1916
1917     This function will rename a file in the local queue directory
1918     and then replicate this rename to all the other nodes we have.
1919
1920     @type rename: list of (old, new)
1921     @param rename: List containing tuples mapping old to new names
1922
1923     """
1924     # Rename them locally
1925     for old, new in rename:
1926       utils.RenameFile(old, new, mkdir=True)
1927
1928     # ... and on all nodes
1929     names, addrs = self._GetNodeIp()
1930     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1931     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1932
1933   def _NewSerialsUnlocked(self, count):
1934     """Generates a new job identifier.
1935
1936     Job identifiers are unique during the lifetime of a cluster.
1937
1938     @type count: integer
1939     @param count: how many serials to return
1940     @rtype: list of int
1941     @return: a list of job identifiers.
1942
1943     """
1944     assert ht.TNonNegativeInt(count)
1945
1946     # New number
1947     serial = self._last_serial + count
1948
1949     # Write to file
1950     self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1951                              "%s\n" % serial, True)
1952
1953     result = [jstore.FormatJobID(v)
1954               for v in range(self._last_serial + 1, serial + 1)]
1955
1956     # Keep it only if we were able to write the file
1957     self._last_serial = serial
1958
1959     assert len(result) == count
1960
1961     return result
1962
1963   @staticmethod
1964   def _GetJobPath(job_id):
1965     """Returns the job file for a given job id.
1966
1967     @type job_id: str
1968     @param job_id: the job identifier
1969     @rtype: str
1970     @return: the path to the job file
1971
1972     """
1973     return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1974
1975   @staticmethod
1976   def _GetArchivedJobPath(job_id):
1977     """Returns the archived job file for a give job id.
1978
1979     @type job_id: str
1980     @param job_id: the job identifier
1981     @rtype: str
1982     @return: the path to the archived job file
1983
1984     """
1985     return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1986                           jstore.GetArchiveDirectory(job_id),
1987                           "job-%s" % job_id)
1988
1989   @staticmethod
1990   def _DetermineJobDirectories(archived):
1991     """Build list of directories containing job files.
1992
1993     @type archived: bool
1994     @param archived: Whether to include directories for archived jobs
1995     @rtype: list
1996
1997     """
1998     result = [pathutils.QUEUE_DIR]
1999
2000     if archived:
2001       archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2002       result.extend(map(compat.partial(utils.PathJoin, archive_path),
2003                         utils.ListVisibleFiles(archive_path)))
2004
2005     return result
2006
2007   @classmethod
2008   def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2009     """Return all known job IDs.
2010
2011     The method only looks at disk because it's a requirement that all
2012     jobs are present on disk (so in the _memcache we don't have any
2013     extra IDs).
2014
2015     @type sort: boolean
2016     @param sort: perform sorting on the returned job ids
2017     @rtype: list
2018     @return: the list of job IDs
2019
2020     """
2021     jlist = []
2022
2023     for path in cls._DetermineJobDirectories(archived):
2024       for filename in utils.ListVisibleFiles(path):
2025         m = constants.JOB_FILE_RE.match(filename)
2026         if m:
2027           jlist.append(int(m.group(1)))
2028
2029     if sort:
2030       jlist.sort()
2031     return jlist
2032
2033   def _LoadJobUnlocked(self, job_id):
2034     """Loads a job from the disk or memory.
2035
2036     Given a job id, this will return the cached job object if
2037     existing, or try to load the job from the disk. If loading from
2038     disk, it will also add the job to the cache.
2039
2040     @type job_id: int
2041     @param job_id: the job id
2042     @rtype: L{_QueuedJob} or None
2043     @return: either None or the job object
2044
2045     """
2046     job = self._memcache.get(job_id, None)
2047     if job:
2048       logging.debug("Found job %s in memcache", job_id)
2049       assert job.writable, "Found read-only job in memcache"
2050       return job
2051
2052     try:
2053       job = self._LoadJobFromDisk(job_id, False)
2054       if job is None:
2055         return job
2056     except errors.JobFileCorrupted:
2057       old_path = self._GetJobPath(job_id)
2058       new_path = self._GetArchivedJobPath(job_id)
2059       if old_path == new_path:
2060         # job already archived (future case)
2061         logging.exception("Can't parse job %s", job_id)
2062       else:
2063         # non-archived case
2064         logging.exception("Can't parse job %s, will archive.", job_id)
2065         self._RenameFilesUnlocked([(old_path, new_path)])
2066       return None
2067
2068     assert job.writable, "Job just loaded is not writable"
2069
2070     self._memcache[job_id] = job
2071     logging.debug("Added job %s to the cache", job_id)
2072     return job
2073
2074   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2075     """Load the given job file from disk.
2076
2077     Given a job file, read, load and restore it in a _QueuedJob format.
2078
2079     @type job_id: int
2080     @param job_id: job identifier
2081     @type try_archived: bool
2082     @param try_archived: Whether to try loading an archived job
2083     @rtype: L{_QueuedJob} or None
2084     @return: either None or the job object
2085
2086     """
2087     path_functions = [(self._GetJobPath, False)]
2088
2089     if try_archived:
2090       path_functions.append((self._GetArchivedJobPath, True))
2091
2092     raw_data = None
2093     archived = None
2094
2095     for (fn, archived) in path_functions:
2096       filepath = fn(job_id)
2097       logging.debug("Loading job from %s", filepath)
2098       try:
2099         raw_data = utils.ReadFile(filepath)
2100       except EnvironmentError, err:
2101         if err.errno != errno.ENOENT:
2102           raise
2103       else:
2104         break
2105
2106     if not raw_data:
2107       return None
2108
2109     if writable is None:
2110       writable = not archived
2111
2112     try:
2113       data = serializer.LoadJson(raw_data)
2114       job = _QueuedJob.Restore(self, data, writable, archived)
2115     except Exception, err: # pylint: disable=W0703
2116       raise errors.JobFileCorrupted(err)
2117
2118     return job
2119
2120   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2121     """Load the given job file from disk.
2122
2123     Given a job file, read, load and restore it in a _QueuedJob format.
2124     In case of error reading the job, it gets returned as None, and the
2125     exception is logged.
2126
2127     @type job_id: int
2128     @param job_id: job identifier
2129     @type try_archived: bool
2130     @param try_archived: Whether to try loading an archived job
2131     @rtype: L{_QueuedJob} or None
2132     @return: either None or the job object
2133
2134     """
2135     try:
2136       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2137     except (errors.JobFileCorrupted, EnvironmentError):
2138       logging.exception("Can't load/parse job %s", job_id)
2139       return None
2140
2141   def _UpdateQueueSizeUnlocked(self):
2142     """Update the queue size.
2143
2144     """
2145     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2146
2147   @locking.ssynchronized(_LOCK)
2148   @_RequireOpenQueue
2149   def SetDrainFlag(self, drain_flag):
2150     """Sets the drain flag for the queue.
2151
2152     @type drain_flag: boolean
2153     @param drain_flag: Whether to set or unset the drain flag
2154
2155     """
2156     # Change flag locally
2157     jstore.SetDrainFlag(drain_flag)
2158
2159     self._drained = drain_flag
2160
2161     # ... and on all nodes
2162     (names, addrs) = self._GetNodeIp()
2163     result = \
2164       self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2165     self._CheckRpcResult(result, self._nodes,
2166                          "Setting queue drain flag to %s" % drain_flag)
2167
2168     return True
2169
2170   @_RequireOpenQueue
2171   def _SubmitJobUnlocked(self, job_id, ops):
2172     """Create and store a new job.
2173
2174     This enters the job into our job queue and also puts it on the new
2175     queue, in order for it to be picked up by the queue processors.
2176
2177     @type job_id: job ID
2178     @param job_id: the job ID for the new job
2179     @type ops: list
2180     @param ops: The list of OpCodes that will become the new job.
2181     @rtype: L{_QueuedJob}
2182     @return: the job object to be queued
2183     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2184     @raise errors.GenericError: If an opcode is not valid
2185
2186     """
2187     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2188       raise errors.JobQueueFull()
2189
2190     job = _QueuedJob(self, job_id, ops, True)
2191
2192     for idx, op in enumerate(job.ops):
2193       # Check priority
2194       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2195         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2196         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2197                                   " are %s" % (idx, op.priority, allowed))
2198
2199       # Check job dependencies
2200       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2201       if not opcodes.TNoRelativeJobDependencies(dependencies):
2202         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2203                                   " match %s: %s" %
2204                                   (idx, opcodes.TNoRelativeJobDependencies,
2205                                    dependencies))
2206
2207     # Write to disk
2208     self.UpdateJobUnlocked(job)
2209
2210     self._queue_size += 1
2211
2212     logging.debug("Adding new job %s to the cache", job_id)
2213     self._memcache[job_id] = job
2214
2215     return job
2216
2217   @locking.ssynchronized(_LOCK)
2218   @_RequireOpenQueue
2219   @_RequireNonDrainedQueue
2220   def SubmitJob(self, ops):
2221     """Create and store a new job.
2222
2223     @see: L{_SubmitJobUnlocked}
2224
2225     """
2226     (job_id, ) = self._NewSerialsUnlocked(1)
2227     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2228     return job_id
2229
2230   @locking.ssynchronized(_LOCK)
2231   @_RequireOpenQueue
2232   @_RequireNonDrainedQueue
2233   def SubmitManyJobs(self, jobs):
2234     """Create and store multiple jobs.
2235
2236     @see: L{_SubmitJobUnlocked}
2237
2238     """
2239     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2240
2241     (results, added_jobs) = \
2242       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2243
2244     self._EnqueueJobsUnlocked(added_jobs)
2245
2246     return results
2247
2248   @staticmethod
2249   def _FormatSubmitError(msg, ops):
2250     """Formats errors which occurred while submitting a job.
2251
2252     """
2253     return ("%s; opcodes %s" %
2254             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2255
2256   @staticmethod
2257   def _ResolveJobDependencies(resolve_fn, deps):
2258     """Resolves relative job IDs in dependencies.
2259
2260     @type resolve_fn: callable
2261     @param resolve_fn: Function to resolve a relative job ID
2262     @type deps: list
2263     @param deps: Dependencies
2264     @rtype: tuple; (boolean, string or list)
2265     @return: If successful (first tuple item), the returned list contains
2266       resolved job IDs along with the requested status; if not successful,
2267       the second element is an error message
2268
2269     """
2270     result = []
2271
2272     for (dep_job_id, dep_status) in deps:
2273       if ht.TRelativeJobId(dep_job_id):
2274         assert ht.TInt(dep_job_id) and dep_job_id < 0
2275         try:
2276           job_id = resolve_fn(dep_job_id)
2277         except IndexError:
2278           # Abort
2279           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2280       else:
2281         job_id = dep_job_id
2282
2283       result.append((job_id, dep_status))
2284
2285     return (True, result)
2286
2287   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2288     """Create and store multiple jobs.
2289
2290     @see: L{_SubmitJobUnlocked}
2291
2292     """
2293     results = []
2294     added_jobs = []
2295
2296     def resolve_fn(job_idx, reljobid):
2297       assert reljobid < 0
2298       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2299
2300     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2301       for op in ops:
2302         if getattr(op, opcodes.DEPEND_ATTR, None):
2303           (status, data) = \
2304             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2305                                          op.depends)
2306           if not status:
2307             # Abort resolving dependencies
2308             assert ht.TNonEmptyString(data), "No error message"
2309             break
2310           # Use resolved dependencies
2311           op.depends = data
2312       else:
2313         try:
2314           job = self._SubmitJobUnlocked(job_id, ops)
2315         except errors.GenericError, err:
2316           status = False
2317           data = self._FormatSubmitError(str(err), ops)
2318         else:
2319           status = True
2320           data = job_id
2321           added_jobs.append(job)
2322
2323       results.append((status, data))
2324
2325     return (results, added_jobs)
2326
2327   @locking.ssynchronized(_LOCK)
2328   def _EnqueueJobs(self, jobs):
2329     """Helper function to add jobs to worker pool's queue.
2330
2331     @type jobs: list
2332     @param jobs: List of all jobs
2333
2334     """
2335     return self._EnqueueJobsUnlocked(jobs)
2336
2337   def _EnqueueJobsUnlocked(self, jobs):
2338     """Helper function to add jobs to worker pool's queue.
2339
2340     @type jobs: list
2341     @param jobs: List of all jobs
2342
2343     """
2344     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2345     self._wpool.AddManyTasks([(job, ) for job in jobs],
2346                              priority=[job.CalcPriority() for job in jobs],
2347                              task_id=map(_GetIdAttr, jobs))
2348
2349   def _GetJobStatusForDependencies(self, job_id):
2350     """Gets the status of a job for dependencies.
2351
2352     @type job_id: int
2353     @param job_id: Job ID
2354     @raise errors.JobLost: If job can't be found
2355
2356     """
2357     # Not using in-memory cache as doing so would require an exclusive lock
2358
2359     # Try to load from disk
2360     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2361
2362     assert not job.writable, "Got writable job" # pylint: disable=E1101
2363
2364     if job:
2365       return job.CalcStatus()
2366
2367     raise errors.JobLost("Job %s not found" % job_id)
2368
2369   @_RequireOpenQueue
2370   def UpdateJobUnlocked(self, job, replicate=True):
2371     """Update a job's on disk storage.
2372
2373     After a job has been modified, this function needs to be called in
2374     order to write the changes to disk and replicate them to the other
2375     nodes.
2376
2377     @type job: L{_QueuedJob}
2378     @param job: the changed job
2379     @type replicate: boolean
2380     @param replicate: whether to replicate the change to remote nodes
2381
2382     """
2383     if __debug__:
2384       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2385       assert (finalized ^ (job.end_timestamp is None))
2386       assert job.writable, "Can't update read-only job"
2387       assert not job.archived, "Can't update archived job"
2388
2389     filename = self._GetJobPath(job.id)
2390     data = serializer.DumpJson(job.Serialize())
2391     logging.debug("Writing job %s to %s", job.id, filename)
2392     self._UpdateJobQueueFile(filename, data, replicate)
2393
2394   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2395                         timeout):
2396     """Waits for changes in a job.
2397
2398     @type job_id: int
2399     @param job_id: Job identifier
2400     @type fields: list of strings
2401     @param fields: Which fields to check for changes
2402     @type prev_job_info: list or None
2403     @param prev_job_info: Last job information returned
2404     @type prev_log_serial: int
2405     @param prev_log_serial: Last job message serial number
2406     @type timeout: float
2407     @param timeout: maximum time to wait in seconds
2408     @rtype: tuple (job info, log entries)
2409     @return: a tuple of the job information as required via
2410         the fields parameter, and the log entries as a list
2411
2412         if the job has not changed and the timeout has expired,
2413         we instead return a special value,
2414         L{constants.JOB_NOTCHANGED}, which should be interpreted
2415         as such by the clients
2416
2417     """
2418     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2419                              writable=False)
2420
2421     helper = _WaitForJobChangesHelper()
2422
2423     return helper(self._GetJobPath(job_id), load_fn,
2424                   fields, prev_job_info, prev_log_serial, timeout)
2425
2426   @locking.ssynchronized(_LOCK)
2427   @_RequireOpenQueue
2428   def CancelJob(self, job_id):
2429     """Cancels a job.
2430
2431     This will only succeed if the job has not started yet.
2432
2433     @type job_id: int
2434     @param job_id: job ID of job to be cancelled.
2435
2436     """
2437     logging.info("Cancelling job %s", job_id)
2438
2439     return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2440
2441   @locking.ssynchronized(_LOCK)
2442   @_RequireOpenQueue
2443   def ChangeJobPriority(self, job_id, priority):
2444     """Changes a job's priority.
2445
2446     @type job_id: int
2447     @param job_id: ID of the job whose priority should be changed
2448     @type priority: int
2449     @param priority: New priority
2450
2451     """
2452     logging.info("Changing priority of job %s to %s", job_id, priority)
2453
2454     if priority not in constants.OP_PRIO_SUBMIT_VALID:
2455       allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2456       raise errors.GenericError("Invalid priority %s, allowed are %s" %
2457                                 (priority, allowed))
2458
2459     def fn(job):
2460       (success, msg) = job.ChangePriority(priority)
2461
2462       if success:
2463         try:
2464           self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2465         except workerpool.NoSuchTask:
2466           logging.debug("Job %s is not in workerpool at this time", job.id)
2467
2468       return (success, msg)
2469
2470     return self._ModifyJobUnlocked(job_id, fn)
2471
2472   def _ModifyJobUnlocked(self, job_id, mod_fn):
2473     """Modifies a job.
2474
2475     @type job_id: int
2476     @param job_id: Job ID
2477     @type mod_fn: callable
2478     @param mod_fn: Modifying function, receiving job object as parameter,
2479       returning tuple of (status boolean, message string)
2480
2481     """
2482     job = self._LoadJobUnlocked(job_id)
2483     if not job:
2484       logging.debug("Job %s not found", job_id)
2485       return (False, "Job %s not found" % job_id)
2486
2487     assert job.writable, "Can't modify read-only job"
2488     assert not job.archived, "Can't modify archived job"
2489
2490     (success, msg) = mod_fn(job)
2491
2492     if success:
2493       # If the job was finalized (e.g. cancelled), this is the final write
2494       # allowed. The job can be archived anytime.
2495       self.UpdateJobUnlocked(job)
2496
2497     return (success, msg)
2498
2499   @_RequireOpenQueue
2500   def _ArchiveJobsUnlocked(self, jobs):
2501     """Archives jobs.
2502
2503     @type jobs: list of L{_QueuedJob}
2504     @param jobs: Job objects
2505     @rtype: int
2506     @return: Number of archived jobs
2507
2508     """
2509     archive_jobs = []
2510     rename_files = []
2511     for job in jobs:
2512       assert job.writable, "Can't archive read-only job"
2513       assert not job.archived, "Can't cancel archived job"
2514
2515       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2516         logging.debug("Job %s is not yet done", job.id)
2517         continue
2518
2519       archive_jobs.append(job)
2520
2521       old = self._GetJobPath(job.id)
2522       new = self._GetArchivedJobPath(job.id)
2523       rename_files.append((old, new))
2524
2525     # TODO: What if 1..n files fail to rename?
2526     self._RenameFilesUnlocked(rename_files)
2527
2528     logging.debug("Successfully archived job(s) %s",
2529                   utils.CommaJoin(job.id for job in archive_jobs))
2530
2531     # Since we haven't quite checked, above, if we succeeded or failed renaming
2532     # the files, we update the cached queue size from the filesystem. When we
2533     # get around to fix the TODO: above, we can use the number of actually
2534     # archived jobs to fix this.
2535     self._UpdateQueueSizeUnlocked()
2536     return len(archive_jobs)
2537
2538   @locking.ssynchronized(_LOCK)
2539   @_RequireOpenQueue
2540   def ArchiveJob(self, job_id):
2541     """Archives a job.
2542
2543     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2544
2545     @type job_id: int
2546     @param job_id: Job ID of job to be archived.
2547     @rtype: bool
2548     @return: Whether job was archived
2549
2550     """
2551     logging.info("Archiving job %s", job_id)
2552
2553     job = self._LoadJobUnlocked(job_id)
2554     if not job:
2555       logging.debug("Job %s not found", job_id)
2556       return False
2557
2558     return self._ArchiveJobsUnlocked([job]) == 1
2559
2560   @locking.ssynchronized(_LOCK)
2561   @_RequireOpenQueue
2562   def AutoArchiveJobs(self, age, timeout):
2563     """Archives all jobs based on age.
2564
2565     The method will archive all jobs which are older than the age
2566     parameter. For jobs that don't have an end timestamp, the start
2567     timestamp will be considered. The special '-1' age will cause
2568     archival of all jobs (that are not running or queued).
2569
2570     @type age: int
2571     @param age: the minimum age in seconds
2572
2573     """
2574     logging.info("Archiving jobs with age more than %s seconds", age)
2575
2576     now = time.time()
2577     end_time = now + timeout
2578     archived_count = 0
2579     last_touched = 0
2580
2581     all_job_ids = self._GetJobIDsUnlocked()
2582     pending = []
2583     for idx, job_id in enumerate(all_job_ids):
2584       last_touched = idx + 1
2585
2586       # Not optimal because jobs could be pending
2587       # TODO: Measure average duration for job archival and take number of
2588       # pending jobs into account.
2589       if time.time() > end_time:
2590         break
2591
2592       # Returns None if the job failed to load
2593       job = self._LoadJobUnlocked(job_id)
2594       if job:
2595         if job.end_timestamp is None:
2596           if job.start_timestamp is None:
2597             job_age = job.received_timestamp
2598           else:
2599             job_age = job.start_timestamp
2600         else:
2601           job_age = job.end_timestamp
2602
2603         if age == -1 or now - job_age[0] > age:
2604           pending.append(job)
2605
2606           # Archive 10 jobs at a time
2607           if len(pending) >= 10:
2608             archived_count += self._ArchiveJobsUnlocked(pending)
2609             pending = []
2610
2611     if pending:
2612       archived_count += self._ArchiveJobsUnlocked(pending)
2613
2614     return (archived_count, len(all_job_ids) - last_touched)
2615
2616   def _Query(self, fields, qfilter):
2617     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2618                        namefield="id")
2619
2620     # Archived jobs are only looked at if the "archived" field is referenced
2621     # either as a requested field or in the filter. By default archived jobs
2622     # are ignored.
2623     include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2624
2625     job_ids = qobj.RequestedNames()
2626
2627     list_all = (job_ids is None)
2628
2629     if list_all:
2630       # Since files are added to/removed from the queue atomically, there's no
2631       # risk of getting the job ids in an inconsistent state.
2632       job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2633
2634     jobs = []
2635
2636     for job_id in job_ids:
2637       job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2638       if job is not None or not list_all:
2639         jobs.append((job_id, job))
2640
2641     return (qobj, jobs, list_all)
2642
2643   def QueryJobs(self, fields, qfilter):
2644     """Returns a list of jobs in queue.
2645
2646     @type fields: sequence
2647     @param fields: List of wanted fields
2648     @type qfilter: None or query2 filter (list)
2649     @param qfilter: Query filter
2650
2651     """
2652     (qobj, ctx, _) = self._Query(fields, qfilter)
2653
2654     return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2655
2656   def OldStyleQueryJobs(self, job_ids, fields):
2657     """Returns a list of jobs in queue.
2658
2659     @type job_ids: list
2660     @param job_ids: sequence of job identifiers or None for all
2661     @type fields: list
2662     @param fields: names of fields to return
2663     @rtype: list
2664     @return: list one element per job, each element being list with
2665         the requested fields
2666
2667     """
2668     # backwards compat:
2669     job_ids = [int(jid) for jid in job_ids]
2670     qfilter = qlang.MakeSimpleFilter("id", job_ids)
2671
2672     (qobj, ctx, _) = self._Query(fields, qfilter)
2673
2674     return qobj.OldStyleQuery(ctx, sort_by_name=False)
2675
2676   @locking.ssynchronized(_LOCK)
2677   def PrepareShutdown(self):
2678     """Prepare to stop the job queue.
2679
2680     Disables execution of jobs in the workerpool and returns whether there are
2681     any jobs currently running. If the latter is the case, the job queue is not
2682     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2683     be called without interfering with any job. Queued and unfinished jobs will
2684     be resumed next time.
2685
2686     Once this function has been called no new job submissions will be accepted
2687     (see L{_RequireNonDrainedQueue}).
2688
2689     @rtype: bool
2690     @return: Whether there are any running jobs
2691
2692     """
2693     if self._accepting_jobs:
2694       self._accepting_jobs = False
2695
2696       # Tell worker pool to stop processing pending tasks
2697       self._wpool.SetActive(False)
2698
2699     return self._wpool.HasRunningTasks()
2700
2701   def AcceptingJobsUnlocked(self):
2702     """Returns whether jobs are accepted.
2703
2704     Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2705     queue is shutting down.
2706
2707     @rtype: bool
2708
2709     """
2710     return self._accepting_jobs
2711
2712   @locking.ssynchronized(_LOCK)
2713   @_RequireOpenQueue
2714   def Shutdown(self):
2715     """Stops the job queue.
2716
2717     This shutdowns all the worker threads an closes the queue.
2718
2719     """
2720     self._wpool.TerminateWorkers()
2721
2722     self._queue_filelock.Close()
2723     self._queue_filelock = None