Merge branch 'stable-2.9' into stable-2.10
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38 import operator
39
40 try:
41   # pylint: disable=E0611
42   from pyinotify import pyinotify
43 except ImportError:
44   import pyinotify
45
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import opcodes_base
53 from ganeti import errors
54 from ganeti import mcpu
55 from ganeti import utils
56 from ganeti import jstore
57 from ganeti import rpc
58 from ganeti import runtime
59 from ganeti import netutils
60 from ganeti import compat
61 from ganeti import ht
62 from ganeti import query
63 from ganeti import qlang
64 from ganeti import pathutils
65 from ganeti import vcluster
66
67
68 JOBQUEUE_THREADS = 25
69
70 # member lock names to be passed to @ssynchronized decorator
71 _LOCK = "_lock"
72 _QUEUE = "_queue"
73
74 #: Retrieves "id" attribute
75 _GetIdAttr = operator.attrgetter("id")
76
77
78 class CancelJob(Exception):
79   """Special exception to cancel a job.
80
81   """
82
83
84 class QueueShutdown(Exception):
85   """Special exception to abort a job when the job queue is shutting down.
86
87   """
88
89
90 def TimeStampNow():
91   """Returns the current timestamp.
92
93   @rtype: tuple
94   @return: the current time in the (seconds, microseconds) format
95
96   """
97   return utils.SplitTime(time.time())
98
99
100 def _CallJqUpdate(runner, names, file_name, content):
101   """Updates job queue file after virtualizing filename.
102
103   """
104   virt_file_name = vcluster.MakeVirtualPath(file_name)
105   return runner.call_jobqueue_update(names, virt_file_name, content)
106
107
108 class _SimpleJobQuery:
109   """Wrapper for job queries.
110
111   Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
112
113   """
114   def __init__(self, fields):
115     """Initializes this class.
116
117     """
118     self._query = query.Query(query.JOB_FIELDS, fields)
119
120   def __call__(self, job):
121     """Executes a job query using cached field list.
122
123     """
124     return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
125
126
127 class _QueuedOpCode(object):
128   """Encapsulates an opcode object.
129
130   @ivar log: holds the execution log and consists of tuples
131   of the form C{(log_serial, timestamp, level, message)}
132   @ivar input: the OpCode we encapsulate
133   @ivar status: the current status
134   @ivar result: the result of the LU execution
135   @ivar start_timestamp: timestamp for the start of the execution
136   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
137   @ivar stop_timestamp: timestamp for the end of the execution
138
139   """
140   __slots__ = ["input", "status", "result", "log", "priority",
141                "start_timestamp", "exec_timestamp", "end_timestamp",
142                "__weakref__"]
143
144   def __init__(self, op):
145     """Initializes instances of this class.
146
147     @type op: L{opcodes.OpCode}
148     @param op: the opcode we encapsulate
149
150     """
151     self.input = op
152     self.status = constants.OP_STATUS_QUEUED
153     self.result = None
154     self.log = []
155     self.start_timestamp = None
156     self.exec_timestamp = None
157     self.end_timestamp = None
158
159     # Get initial priority (it might change during the lifetime of this opcode)
160     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
161
162   @classmethod
163   def Restore(cls, state):
164     """Restore the _QueuedOpCode from the serialized form.
165
166     @type state: dict
167     @param state: the serialized state
168     @rtype: _QueuedOpCode
169     @return: a new _QueuedOpCode instance
170
171     """
172     obj = _QueuedOpCode.__new__(cls)
173     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
174     obj.status = state["status"]
175     obj.result = state["result"]
176     obj.log = state["log"]
177     obj.start_timestamp = state.get("start_timestamp", None)
178     obj.exec_timestamp = state.get("exec_timestamp", None)
179     obj.end_timestamp = state.get("end_timestamp", None)
180     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
181     return obj
182
183   def Serialize(self):
184     """Serializes this _QueuedOpCode.
185
186     @rtype: dict
187     @return: the dictionary holding the serialized state
188
189     """
190     return {
191       "input": self.input.__getstate__(),
192       "status": self.status,
193       "result": self.result,
194       "log": self.log,
195       "start_timestamp": self.start_timestamp,
196       "exec_timestamp": self.exec_timestamp,
197       "end_timestamp": self.end_timestamp,
198       "priority": self.priority,
199       }
200
201
202 class _QueuedJob(object):
203   """In-memory job representation.
204
205   This is what we use to track the user-submitted jobs. Locking must
206   be taken care of by users of this class.
207
208   @type queue: L{JobQueue}
209   @ivar queue: the parent queue
210   @ivar id: the job ID
211   @type ops: list
212   @ivar ops: the list of _QueuedOpCode that constitute the job
213   @type log_serial: int
214   @ivar log_serial: holds the index for the next log entry
215   @ivar received_timestamp: the timestamp for when the job was received
216   @ivar start_timestmap: the timestamp for start of execution
217   @ivar end_timestamp: the timestamp for end of execution
218   @ivar writable: Whether the job is allowed to be modified
219
220   """
221   # pylint: disable=W0212
222   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
223                "received_timestamp", "start_timestamp", "end_timestamp",
224                "__weakref__", "processor_lock", "writable", "archived"]
225
226   def _AddReasons(self):
227     """Extend the reason trail
228
229     Add the reason for all the opcodes of this job to be executed.
230
231     """
232     count = 0
233     for queued_op in self.ops:
234       op = queued_op.input
235       reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__)
236       reason_text = "job=%d;index=%d" % (self.id, count)
237       reason = getattr(op, "reason", [])
238       reason.append((reason_src, reason_text, utils.EpochNano()))
239       op.reason = reason
240       count = count + 1
241
242   def __init__(self, queue, job_id, ops, writable):
243     """Constructor for the _QueuedJob.
244
245     @type queue: L{JobQueue}
246     @param queue: our parent queue
247     @type job_id: job_id
248     @param job_id: our job id
249     @type ops: list
250     @param ops: the list of opcodes we hold, which will be encapsulated
251         in _QueuedOpCodes
252     @type writable: bool
253     @param writable: Whether job can be modified
254
255     """
256     if not ops:
257       raise errors.GenericError("A job needs at least one opcode")
258
259     self.queue = queue
260     self.id = int(job_id)
261     self.ops = [_QueuedOpCode(op) for op in ops]
262     self._AddReasons()
263     self.log_serial = 0
264     self.received_timestamp = TimeStampNow()
265     self.start_timestamp = None
266     self.end_timestamp = None
267     self.archived = False
268
269     self._InitInMemory(self, writable)
270
271     assert not self.archived, "New jobs can not be marked as archived"
272
273   @staticmethod
274   def _InitInMemory(obj, writable):
275     """Initializes in-memory variables.
276
277     """
278     obj.writable = writable
279     obj.ops_iter = None
280     obj.cur_opctx = None
281
282     # Read-only jobs are not processed and therefore don't need a lock
283     if writable:
284       obj.processor_lock = threading.Lock()
285     else:
286       obj.processor_lock = None
287
288   def __repr__(self):
289     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
290               "id=%s" % self.id,
291               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
292
293     return "<%s at %#x>" % (" ".join(status), id(self))
294
295   @classmethod
296   def Restore(cls, queue, state, writable, archived):
297     """Restore a _QueuedJob from serialized state:
298
299     @type queue: L{JobQueue}
300     @param queue: to which queue the restored job belongs
301     @type state: dict
302     @param state: the serialized state
303     @type writable: bool
304     @param writable: Whether job can be modified
305     @type archived: bool
306     @param archived: Whether job was already archived
307     @rtype: _JobQueue
308     @return: the restored _JobQueue instance
309
310     """
311     obj = _QueuedJob.__new__(cls)
312     obj.queue = queue
313     obj.id = int(state["id"])
314     obj.received_timestamp = state.get("received_timestamp", None)
315     obj.start_timestamp = state.get("start_timestamp", None)
316     obj.end_timestamp = state.get("end_timestamp", None)
317     obj.archived = archived
318
319     obj.ops = []
320     obj.log_serial = 0
321     for op_state in state["ops"]:
322       op = _QueuedOpCode.Restore(op_state)
323       for log_entry in op.log:
324         obj.log_serial = max(obj.log_serial, log_entry[0])
325       obj.ops.append(op)
326
327     cls._InitInMemory(obj, writable)
328
329     return obj
330
331   def Serialize(self):
332     """Serialize the _JobQueue instance.
333
334     @rtype: dict
335     @return: the serialized state
336
337     """
338     return {
339       "id": self.id,
340       "ops": [op.Serialize() for op in self.ops],
341       "start_timestamp": self.start_timestamp,
342       "end_timestamp": self.end_timestamp,
343       "received_timestamp": self.received_timestamp,
344       }
345
346   def CalcStatus(self):
347     """Compute the status of this job.
348
349     This function iterates over all the _QueuedOpCodes in the job and
350     based on their status, computes the job status.
351
352     The algorithm is:
353       - if we find a cancelled, or finished with error, the job
354         status will be the same
355       - otherwise, the last opcode with the status one of:
356           - waitlock
357           - canceling
358           - running
359
360         will determine the job status
361
362       - otherwise, it means either all opcodes are queued, or success,
363         and the job status will be the same
364
365     @return: the job status
366
367     """
368     status = constants.JOB_STATUS_QUEUED
369
370     all_success = True
371     for op in self.ops:
372       if op.status == constants.OP_STATUS_SUCCESS:
373         continue
374
375       all_success = False
376
377       if op.status == constants.OP_STATUS_QUEUED:
378         pass
379       elif op.status == constants.OP_STATUS_WAITING:
380         status = constants.JOB_STATUS_WAITING
381       elif op.status == constants.OP_STATUS_RUNNING:
382         status = constants.JOB_STATUS_RUNNING
383       elif op.status == constants.OP_STATUS_CANCELING:
384         status = constants.JOB_STATUS_CANCELING
385         break
386       elif op.status == constants.OP_STATUS_ERROR:
387         status = constants.JOB_STATUS_ERROR
388         # The whole job fails if one opcode failed
389         break
390       elif op.status == constants.OP_STATUS_CANCELED:
391         status = constants.OP_STATUS_CANCELED
392         break
393
394     if all_success:
395       status = constants.JOB_STATUS_SUCCESS
396
397     return status
398
399   def CalcPriority(self):
400     """Gets the current priority for this job.
401
402     Only unfinished opcodes are considered. When all are done, the default
403     priority is used.
404
405     @rtype: int
406
407     """
408     priorities = [op.priority for op in self.ops
409                   if op.status not in constants.OPS_FINALIZED]
410
411     if not priorities:
412       # All opcodes are done, assume default priority
413       return constants.OP_PRIO_DEFAULT
414
415     return min(priorities)
416
417   def GetLogEntries(self, newer_than):
418     """Selectively returns the log entries.
419
420     @type newer_than: None or int
421     @param newer_than: if this is None, return all log entries,
422         otherwise return only the log entries with serial higher
423         than this value
424     @rtype: list
425     @return: the list of the log entries selected
426
427     """
428     if newer_than is None:
429       serial = -1
430     else:
431       serial = newer_than
432
433     entries = []
434     for op in self.ops:
435       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
436
437     return entries
438
439   def GetInfo(self, fields):
440     """Returns information about a job.
441
442     @type fields: list
443     @param fields: names of fields to return
444     @rtype: list
445     @return: list with one element for each field
446     @raise errors.OpExecError: when an invalid field
447         has been passed
448
449     """
450     return _SimpleJobQuery(fields)(self)
451
452   def MarkUnfinishedOps(self, status, result):
453     """Mark unfinished opcodes with a given status and result.
454
455     This is an utility function for marking all running or waiting to
456     be run opcodes with a given status. Opcodes which are already
457     finalised are not changed.
458
459     @param status: a given opcode status
460     @param result: the opcode result
461
462     """
463     not_marked = True
464     for op in self.ops:
465       if op.status in constants.OPS_FINALIZED:
466         assert not_marked, "Finalized opcodes found after non-finalized ones"
467         continue
468       op.status = status
469       op.result = result
470       not_marked = False
471
472   def Finalize(self):
473     """Marks the job as finalized.
474
475     """
476     self.end_timestamp = TimeStampNow()
477
478   def Cancel(self):
479     """Marks job as canceled/-ing if possible.
480
481     @rtype: tuple; (bool, string)
482     @return: Boolean describing whether job was successfully canceled or marked
483       as canceling and a text message
484
485     """
486     status = self.CalcStatus()
487
488     if status == constants.JOB_STATUS_QUEUED:
489       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
490                              "Job canceled by request")
491       self.Finalize()
492       return (True, "Job %s canceled" % self.id)
493
494     elif status == constants.JOB_STATUS_WAITING:
495       # The worker will notice the new status and cancel the job
496       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
497       return (True, "Job %s will be canceled" % self.id)
498
499     else:
500       logging.debug("Job %s is no longer waiting in the queue", self.id)
501       return (False, "Job %s is no longer waiting in the queue" % self.id)
502
503   def ChangePriority(self, priority):
504     """Changes the job priority.
505
506     @type priority: int
507     @param priority: New priority
508     @rtype: tuple; (bool, string)
509     @return: Boolean describing whether job's priority was successfully changed
510       and a text message
511
512     """
513     status = self.CalcStatus()
514
515     if status in constants.JOBS_FINALIZED:
516       return (False, "Job %s is finished" % self.id)
517     elif status == constants.JOB_STATUS_CANCELING:
518       return (False, "Job %s is cancelling" % self.id)
519     else:
520       assert status in (constants.JOB_STATUS_QUEUED,
521                         constants.JOB_STATUS_WAITING,
522                         constants.JOB_STATUS_RUNNING)
523
524       changed = False
525       for op in self.ops:
526         if (op.status == constants.OP_STATUS_RUNNING or
527             op.status in constants.OPS_FINALIZED):
528           assert not changed, \
529             ("Found opcode for which priority should not be changed after"
530              " priority has been changed for previous opcodes")
531           continue
532
533         assert op.status in (constants.OP_STATUS_QUEUED,
534                              constants.OP_STATUS_WAITING)
535
536         changed = True
537
538         # Set new priority (doesn't modify opcode input)
539         op.priority = priority
540
541       if changed:
542         return (True, ("Priorities of pending opcodes for job %s have been"
543                        " changed to %s" % (self.id, priority)))
544       else:
545         return (False, "Job %s had no pending opcodes" % self.id)
546
547
548 class _OpExecCallbacks(mcpu.OpExecCbBase):
549   def __init__(self, queue, job, op):
550     """Initializes this class.
551
552     @type queue: L{JobQueue}
553     @param queue: Job queue
554     @type job: L{_QueuedJob}
555     @param job: Job object
556     @type op: L{_QueuedOpCode}
557     @param op: OpCode
558
559     """
560     assert queue, "Queue is missing"
561     assert job, "Job is missing"
562     assert op, "Opcode is missing"
563
564     self._queue = queue
565     self._job = job
566     self._op = op
567
568   def _CheckCancel(self):
569     """Raises an exception to cancel the job if asked to.
570
571     """
572     # Cancel here if we were asked to
573     if self._op.status == constants.OP_STATUS_CANCELING:
574       logging.debug("Canceling opcode")
575       raise CancelJob()
576
577     # See if queue is shutting down
578     if not self._queue.AcceptingJobsUnlocked():
579       logging.debug("Queue is shutting down")
580       raise QueueShutdown()
581
582   @locking.ssynchronized(_QUEUE, shared=1)
583   def NotifyStart(self):
584     """Mark the opcode as running, not lock-waiting.
585
586     This is called from the mcpu code as a notifier function, when the LU is
587     finally about to start the Exec() method. Of course, to have end-user
588     visible results, the opcode must be initially (before calling into
589     Processor.ExecOpCode) set to OP_STATUS_WAITING.
590
591     """
592     assert self._op in self._job.ops
593     assert self._op.status in (constants.OP_STATUS_WAITING,
594                                constants.OP_STATUS_CANCELING)
595
596     # Cancel here if we were asked to
597     self._CheckCancel()
598
599     logging.debug("Opcode is now running")
600
601     self._op.status = constants.OP_STATUS_RUNNING
602     self._op.exec_timestamp = TimeStampNow()
603
604     # And finally replicate the job status
605     self._queue.UpdateJobUnlocked(self._job)
606
607   @locking.ssynchronized(_QUEUE, shared=1)
608   def _AppendFeedback(self, timestamp, log_type, log_msg):
609     """Internal feedback append function, with locks
610
611     """
612     self._job.log_serial += 1
613     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
614     self._queue.UpdateJobUnlocked(self._job, replicate=False)
615
616   def Feedback(self, *args):
617     """Append a log entry.
618
619     """
620     assert len(args) < 3
621
622     if len(args) == 1:
623       log_type = constants.ELOG_MESSAGE
624       log_msg = args[0]
625     else:
626       (log_type, log_msg) = args
627
628     # The time is split to make serialization easier and not lose
629     # precision.
630     timestamp = utils.SplitTime(time.time())
631     self._AppendFeedback(timestamp, log_type, log_msg)
632
633   def CurrentPriority(self):
634     """Returns current priority for opcode.
635
636     """
637     assert self._op.status in (constants.OP_STATUS_WAITING,
638                                constants.OP_STATUS_CANCELING)
639
640     # Cancel here if we were asked to
641     self._CheckCancel()
642
643     return self._op.priority
644
645   def SubmitManyJobs(self, jobs):
646     """Submits jobs for processing.
647
648     See L{JobQueue.SubmitManyJobs}.
649
650     """
651     # Locking is done in job queue
652     return self._queue.SubmitManyJobs(jobs)
653
654
655 class _JobChangesChecker(object):
656   def __init__(self, fields, prev_job_info, prev_log_serial):
657     """Initializes this class.
658
659     @type fields: list of strings
660     @param fields: Fields requested by LUXI client
661     @type prev_job_info: string
662     @param prev_job_info: previous job info, as passed by the LUXI client
663     @type prev_log_serial: string
664     @param prev_log_serial: previous job serial, as passed by the LUXI client
665
666     """
667     self._squery = _SimpleJobQuery(fields)
668     self._prev_job_info = prev_job_info
669     self._prev_log_serial = prev_log_serial
670
671   def __call__(self, job):
672     """Checks whether job has changed.
673
674     @type job: L{_QueuedJob}
675     @param job: Job object
676
677     """
678     assert not job.writable, "Expected read-only job"
679
680     status = job.CalcStatus()
681     job_info = self._squery(job)
682     log_entries = job.GetLogEntries(self._prev_log_serial)
683
684     # Serializing and deserializing data can cause type changes (e.g. from
685     # tuple to list) or precision loss. We're doing it here so that we get
686     # the same modifications as the data received from the client. Without
687     # this, the comparison afterwards might fail without the data being
688     # significantly different.
689     # TODO: we just deserialized from disk, investigate how to make sure that
690     # the job info and log entries are compatible to avoid this further step.
691     # TODO: Doing something like in testutils.py:UnifyValueType might be more
692     # efficient, though floats will be tricky
693     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
694     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
695
696     # Don't even try to wait if the job is no longer running, there will be
697     # no changes.
698     if (status not in (constants.JOB_STATUS_QUEUED,
699                        constants.JOB_STATUS_RUNNING,
700                        constants.JOB_STATUS_WAITING) or
701         job_info != self._prev_job_info or
702         (log_entries and self._prev_log_serial != log_entries[0][0])):
703       logging.debug("Job %s changed", job.id)
704       return (job_info, log_entries)
705
706     return None
707
708
709 class _JobFileChangesWaiter(object):
710   def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
711     """Initializes this class.
712
713     @type filename: string
714     @param filename: Path to job file
715     @raises errors.InotifyError: if the notifier cannot be setup
716
717     """
718     self._wm = _inotify_wm_cls()
719     self._inotify_handler = \
720       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
721     self._notifier = \
722       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
723     try:
724       self._inotify_handler.enable()
725     except Exception:
726       # pyinotify doesn't close file descriptors automatically
727       self._notifier.stop()
728       raise
729
730   def _OnInotify(self, notifier_enabled):
731     """Callback for inotify.
732
733     """
734     if not notifier_enabled:
735       self._inotify_handler.enable()
736
737   def Wait(self, timeout):
738     """Waits for the job file to change.
739
740     @type timeout: float
741     @param timeout: Timeout in seconds
742     @return: Whether there have been events
743
744     """
745     assert timeout >= 0
746     have_events = self._notifier.check_events(timeout * 1000)
747     if have_events:
748       self._notifier.read_events()
749     self._notifier.process_events()
750     return have_events
751
752   def Close(self):
753     """Closes underlying notifier and its file descriptor.
754
755     """
756     self._notifier.stop()
757
758
759 class _JobChangesWaiter(object):
760   def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
761     """Initializes this class.
762
763     @type filename: string
764     @param filename: Path to job file
765
766     """
767     self._filewaiter = None
768     self._filename = filename
769     self._waiter_cls = _waiter_cls
770
771   def Wait(self, timeout):
772     """Waits for a job to change.
773
774     @type timeout: float
775     @param timeout: Timeout in seconds
776     @return: Whether there have been events
777
778     """
779     if self._filewaiter:
780       return self._filewaiter.Wait(timeout)
781
782     # Lazy setup: Avoid inotify setup cost when job file has already changed.
783     # If this point is reached, return immediately and let caller check the job
784     # file again in case there were changes since the last check. This avoids a
785     # race condition.
786     self._filewaiter = self._waiter_cls(self._filename)
787
788     return True
789
790   def Close(self):
791     """Closes underlying waiter.
792
793     """
794     if self._filewaiter:
795       self._filewaiter.Close()
796
797
798 class _WaitForJobChangesHelper(object):
799   """Helper class using inotify to wait for changes in a job file.
800
801   This class takes a previous job status and serial, and alerts the client when
802   the current job status has changed.
803
804   """
805   @staticmethod
806   def _CheckForChanges(counter, job_load_fn, check_fn):
807     if counter.next() > 0:
808       # If this isn't the first check the job is given some more time to change
809       # again. This gives better performance for jobs generating many
810       # changes/messages.
811       time.sleep(0.1)
812
813     job = job_load_fn()
814     if not job:
815       raise errors.JobLost()
816
817     result = check_fn(job)
818     if result is None:
819       raise utils.RetryAgain()
820
821     return result
822
823   def __call__(self, filename, job_load_fn,
824                fields, prev_job_info, prev_log_serial, timeout,
825                _waiter_cls=_JobChangesWaiter):
826     """Waits for changes on a job.
827
828     @type filename: string
829     @param filename: File on which to wait for changes
830     @type job_load_fn: callable
831     @param job_load_fn: Function to load job
832     @type fields: list of strings
833     @param fields: Which fields to check for changes
834     @type prev_job_info: list or None
835     @param prev_job_info: Last job information returned
836     @type prev_log_serial: int
837     @param prev_log_serial: Last job message serial number
838     @type timeout: float
839     @param timeout: maximum time to wait in seconds
840
841     """
842     counter = itertools.count()
843     try:
844       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
845       waiter = _waiter_cls(filename)
846       try:
847         return utils.Retry(compat.partial(self._CheckForChanges,
848                                           counter, job_load_fn, check_fn),
849                            utils.RETRY_REMAINING_TIME, timeout,
850                            wait_fn=waiter.Wait)
851       finally:
852         waiter.Close()
853     except errors.JobLost:
854       return None
855     except utils.RetryTimeout:
856       return constants.JOB_NOTCHANGED
857
858
859 def _EncodeOpError(err):
860   """Encodes an error which occurred while processing an opcode.
861
862   """
863   if isinstance(err, errors.GenericError):
864     to_encode = err
865   else:
866     to_encode = errors.OpExecError(str(err))
867
868   return errors.EncodeException(to_encode)
869
870
871 class _TimeoutStrategyWrapper:
872   def __init__(self, fn):
873     """Initializes this class.
874
875     """
876     self._fn = fn
877     self._next = None
878
879   def _Advance(self):
880     """Gets the next timeout if necessary.
881
882     """
883     if self._next is None:
884       self._next = self._fn()
885
886   def Peek(self):
887     """Returns the next timeout.
888
889     """
890     self._Advance()
891     return self._next
892
893   def Next(self):
894     """Returns the current timeout and advances the internal state.
895
896     """
897     self._Advance()
898     result = self._next
899     self._next = None
900     return result
901
902
903 class _OpExecContext:
904   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
905     """Initializes this class.
906
907     """
908     self.op = op
909     self.index = index
910     self.log_prefix = log_prefix
911     self.summary = op.input.Summary()
912
913     # Create local copy to modify
914     if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
915       self.jobdeps = op.input.depends[:]
916     else:
917       self.jobdeps = None
918
919     self._timeout_strategy_factory = timeout_strategy_factory
920     self._ResetTimeoutStrategy()
921
922   def _ResetTimeoutStrategy(self):
923     """Creates a new timeout strategy.
924
925     """
926     self._timeout_strategy = \
927       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
928
929   def CheckPriorityIncrease(self):
930     """Checks whether priority can and should be increased.
931
932     Called when locks couldn't be acquired.
933
934     """
935     op = self.op
936
937     # Exhausted all retries and next round should not use blocking acquire
938     # for locks?
939     if (self._timeout_strategy.Peek() is None and
940         op.priority > constants.OP_PRIO_HIGHEST):
941       logging.debug("Increasing priority")
942       op.priority -= 1
943       self._ResetTimeoutStrategy()
944       return True
945
946     return False
947
948   def GetNextLockTimeout(self):
949     """Returns the next lock acquire timeout.
950
951     """
952     return self._timeout_strategy.Next()
953
954
955 class _JobProcessor(object):
956   (DEFER,
957    WAITDEP,
958    FINISHED) = range(1, 4)
959
960   def __init__(self, queue, opexec_fn, job,
961                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
962     """Initializes this class.
963
964     """
965     self.queue = queue
966     self.opexec_fn = opexec_fn
967     self.job = job
968     self._timeout_strategy_factory = _timeout_strategy_factory
969
970   @staticmethod
971   def _FindNextOpcode(job, timeout_strategy_factory):
972     """Locates the next opcode to run.
973
974     @type job: L{_QueuedJob}
975     @param job: Job object
976     @param timeout_strategy_factory: Callable to create new timeout strategy
977
978     """
979     # Create some sort of a cache to speed up locating next opcode for future
980     # lookups
981     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
982     # pending and one for processed ops.
983     if job.ops_iter is None:
984       job.ops_iter = enumerate(job.ops)
985
986     # Find next opcode to run
987     while True:
988       try:
989         (idx, op) = job.ops_iter.next()
990       except StopIteration:
991         raise errors.ProgrammerError("Called for a finished job")
992
993       if op.status == constants.OP_STATUS_RUNNING:
994         # Found an opcode already marked as running
995         raise errors.ProgrammerError("Called for job marked as running")
996
997       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
998                              timeout_strategy_factory)
999
1000       if op.status not in constants.OPS_FINALIZED:
1001         return opctx
1002
1003       # This is a job that was partially completed before master daemon
1004       # shutdown, so it can be expected that some opcodes are already
1005       # completed successfully (if any did error out, then the whole job
1006       # should have been aborted and not resubmitted for processing).
1007       logging.info("%s: opcode %s already processed, skipping",
1008                    opctx.log_prefix, opctx.summary)
1009
1010   @staticmethod
1011   def _MarkWaitlock(job, op):
1012     """Marks an opcode as waiting for locks.
1013
1014     The job's start timestamp is also set if necessary.
1015
1016     @type job: L{_QueuedJob}
1017     @param job: Job object
1018     @type op: L{_QueuedOpCode}
1019     @param op: Opcode object
1020
1021     """
1022     assert op in job.ops
1023     assert op.status in (constants.OP_STATUS_QUEUED,
1024                          constants.OP_STATUS_WAITING)
1025
1026     update = False
1027
1028     op.result = None
1029
1030     if op.status == constants.OP_STATUS_QUEUED:
1031       op.status = constants.OP_STATUS_WAITING
1032       update = True
1033
1034     if op.start_timestamp is None:
1035       op.start_timestamp = TimeStampNow()
1036       update = True
1037
1038     if job.start_timestamp is None:
1039       job.start_timestamp = op.start_timestamp
1040       update = True
1041
1042     assert op.status == constants.OP_STATUS_WAITING
1043
1044     return update
1045
1046   @staticmethod
1047   def _CheckDependencies(queue, job, opctx):
1048     """Checks if an opcode has dependencies and if so, processes them.
1049
1050     @type queue: L{JobQueue}
1051     @param queue: Queue object
1052     @type job: L{_QueuedJob}
1053     @param job: Job object
1054     @type opctx: L{_OpExecContext}
1055     @param opctx: Opcode execution context
1056     @rtype: bool
1057     @return: Whether opcode will be re-scheduled by dependency tracker
1058
1059     """
1060     op = opctx.op
1061
1062     result = False
1063
1064     while opctx.jobdeps:
1065       (dep_job_id, dep_status) = opctx.jobdeps[0]
1066
1067       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1068                                                           dep_status)
1069       assert ht.TNonEmptyString(depmsg), "No dependency message"
1070
1071       logging.info("%s: %s", opctx.log_prefix, depmsg)
1072
1073       if depresult == _JobDependencyManager.CONTINUE:
1074         # Remove dependency and continue
1075         opctx.jobdeps.pop(0)
1076
1077       elif depresult == _JobDependencyManager.WAIT:
1078         # Need to wait for notification, dependency tracker will re-add job
1079         # to workerpool
1080         result = True
1081         break
1082
1083       elif depresult == _JobDependencyManager.CANCEL:
1084         # Job was cancelled, cancel this job as well
1085         job.Cancel()
1086         assert op.status == constants.OP_STATUS_CANCELING
1087         break
1088
1089       elif depresult in (_JobDependencyManager.WRONGSTATUS,
1090                          _JobDependencyManager.ERROR):
1091         # Job failed or there was an error, this job must fail
1092         op.status = constants.OP_STATUS_ERROR
1093         op.result = _EncodeOpError(errors.OpExecError(depmsg))
1094         break
1095
1096       else:
1097         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1098                                      depresult)
1099
1100     return result
1101
1102   def _ExecOpCodeUnlocked(self, opctx):
1103     """Processes one opcode and returns the result.
1104
1105     """
1106     op = opctx.op
1107
1108     assert op.status == constants.OP_STATUS_WAITING
1109
1110     timeout = opctx.GetNextLockTimeout()
1111
1112     try:
1113       # Make sure not to hold queue lock while calling ExecOpCode
1114       result = self.opexec_fn(op.input,
1115                               _OpExecCallbacks(self.queue, self.job, op),
1116                               timeout=timeout)
1117     except mcpu.LockAcquireTimeout:
1118       assert timeout is not None, "Received timeout for blocking acquire"
1119       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1120
1121       assert op.status in (constants.OP_STATUS_WAITING,
1122                            constants.OP_STATUS_CANCELING)
1123
1124       # Was job cancelled while we were waiting for the lock?
1125       if op.status == constants.OP_STATUS_CANCELING:
1126         return (constants.OP_STATUS_CANCELING, None)
1127
1128       # Queue is shutting down, return to queued
1129       if not self.queue.AcceptingJobsUnlocked():
1130         return (constants.OP_STATUS_QUEUED, None)
1131
1132       # Stay in waitlock while trying to re-acquire lock
1133       return (constants.OP_STATUS_WAITING, None)
1134     except CancelJob:
1135       logging.exception("%s: Canceling job", opctx.log_prefix)
1136       assert op.status == constants.OP_STATUS_CANCELING
1137       return (constants.OP_STATUS_CANCELING, None)
1138
1139     except QueueShutdown:
1140       logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1141
1142       assert op.status == constants.OP_STATUS_WAITING
1143
1144       # Job hadn't been started yet, so it should return to the queue
1145       return (constants.OP_STATUS_QUEUED, None)
1146
1147     except Exception, err: # pylint: disable=W0703
1148       logging.exception("%s: Caught exception in %s",
1149                         opctx.log_prefix, opctx.summary)
1150       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1151     else:
1152       logging.debug("%s: %s successful",
1153                     opctx.log_prefix, opctx.summary)
1154       return (constants.OP_STATUS_SUCCESS, result)
1155
1156   def __call__(self, _nextop_fn=None):
1157     """Continues execution of a job.
1158
1159     @param _nextop_fn: Callback function for tests
1160     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1161       be deferred and C{WAITDEP} if the dependency manager
1162       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1163
1164     """
1165     queue = self.queue
1166     job = self.job
1167
1168     logging.debug("Processing job %s", job.id)
1169
1170     queue.acquire(shared=1)
1171     try:
1172       opcount = len(job.ops)
1173
1174       assert job.writable, "Expected writable job"
1175
1176       # Don't do anything for finalized jobs
1177       if job.CalcStatus() in constants.JOBS_FINALIZED:
1178         return self.FINISHED
1179
1180       # Is a previous opcode still pending?
1181       if job.cur_opctx:
1182         opctx = job.cur_opctx
1183         job.cur_opctx = None
1184       else:
1185         if __debug__ and _nextop_fn:
1186           _nextop_fn()
1187         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1188
1189       op = opctx.op
1190
1191       # Consistency check
1192       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1193                                      constants.OP_STATUS_CANCELING)
1194                         for i in job.ops[opctx.index + 1:])
1195
1196       assert op.status in (constants.OP_STATUS_QUEUED,
1197                            constants.OP_STATUS_WAITING,
1198                            constants.OP_STATUS_CANCELING)
1199
1200       assert (op.priority <= constants.OP_PRIO_LOWEST and
1201               op.priority >= constants.OP_PRIO_HIGHEST)
1202
1203       waitjob = None
1204
1205       if op.status != constants.OP_STATUS_CANCELING:
1206         assert op.status in (constants.OP_STATUS_QUEUED,
1207                              constants.OP_STATUS_WAITING)
1208
1209         # Prepare to start opcode
1210         if self._MarkWaitlock(job, op):
1211           # Write to disk
1212           queue.UpdateJobUnlocked(job)
1213
1214         assert op.status == constants.OP_STATUS_WAITING
1215         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1216         assert job.start_timestamp and op.start_timestamp
1217         assert waitjob is None
1218
1219         # Check if waiting for a job is necessary
1220         waitjob = self._CheckDependencies(queue, job, opctx)
1221
1222         assert op.status in (constants.OP_STATUS_WAITING,
1223                              constants.OP_STATUS_CANCELING,
1224                              constants.OP_STATUS_ERROR)
1225
1226         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1227                                          constants.OP_STATUS_ERROR)):
1228           logging.info("%s: opcode %s waiting for locks",
1229                        opctx.log_prefix, opctx.summary)
1230
1231           assert not opctx.jobdeps, "Not all dependencies were removed"
1232
1233           queue.release()
1234           try:
1235             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1236           finally:
1237             queue.acquire(shared=1)
1238
1239           op.status = op_status
1240           op.result = op_result
1241
1242           assert not waitjob
1243
1244         if op.status in (constants.OP_STATUS_WAITING,
1245                          constants.OP_STATUS_QUEUED):
1246           # waiting: Couldn't get locks in time
1247           # queued: Queue is shutting down
1248           assert not op.end_timestamp
1249         else:
1250           # Finalize opcode
1251           op.end_timestamp = TimeStampNow()
1252
1253           if op.status == constants.OP_STATUS_CANCELING:
1254             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1255                                   for i in job.ops[opctx.index:])
1256           else:
1257             assert op.status in constants.OPS_FINALIZED
1258
1259       if op.status == constants.OP_STATUS_QUEUED:
1260         # Queue is shutting down
1261         assert not waitjob
1262
1263         finalize = False
1264
1265         # Reset context
1266         job.cur_opctx = None
1267
1268         # In no case must the status be finalized here
1269         assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1270
1271       elif op.status == constants.OP_STATUS_WAITING or waitjob:
1272         finalize = False
1273
1274         if not waitjob and opctx.CheckPriorityIncrease():
1275           # Priority was changed, need to update on-disk file
1276           queue.UpdateJobUnlocked(job)
1277
1278         # Keep around for another round
1279         job.cur_opctx = opctx
1280
1281         assert (op.priority <= constants.OP_PRIO_LOWEST and
1282                 op.priority >= constants.OP_PRIO_HIGHEST)
1283
1284         # In no case must the status be finalized here
1285         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1286
1287       else:
1288         # Ensure all opcodes so far have been successful
1289         assert (opctx.index == 0 or
1290                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1291                            for i in job.ops[:opctx.index]))
1292
1293         # Reset context
1294         job.cur_opctx = None
1295
1296         if op.status == constants.OP_STATUS_SUCCESS:
1297           finalize = False
1298
1299         elif op.status == constants.OP_STATUS_ERROR:
1300           # Ensure failed opcode has an exception as its result
1301           assert errors.GetEncodedError(job.ops[opctx.index].result)
1302
1303           to_encode = errors.OpExecError("Preceding opcode failed")
1304           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1305                                 _EncodeOpError(to_encode))
1306           finalize = True
1307
1308           # Consistency check
1309           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1310                             errors.GetEncodedError(i.result)
1311                             for i in job.ops[opctx.index:])
1312
1313         elif op.status == constants.OP_STATUS_CANCELING:
1314           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1315                                 "Job canceled by request")
1316           finalize = True
1317
1318         else:
1319           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1320
1321         if opctx.index == (opcount - 1):
1322           # Finalize on last opcode
1323           finalize = True
1324
1325         if finalize:
1326           # All opcodes have been run, finalize job
1327           job.Finalize()
1328
1329         # Write to disk. If the job status is final, this is the final write
1330         # allowed. Once the file has been written, it can be archived anytime.
1331         queue.UpdateJobUnlocked(job)
1332
1333         assert not waitjob
1334
1335         if finalize:
1336           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1337           return self.FINISHED
1338
1339       assert not waitjob or queue.depmgr.JobWaiting(job)
1340
1341       if waitjob:
1342         return self.WAITDEP
1343       else:
1344         return self.DEFER
1345     finally:
1346       assert job.writable, "Job became read-only while being processed"
1347       queue.release()
1348
1349
1350 def _EvaluateJobProcessorResult(depmgr, job, result):
1351   """Looks at a result from L{_JobProcessor} for a job.
1352
1353   To be used in a L{_JobQueueWorker}.
1354
1355   """
1356   if result == _JobProcessor.FINISHED:
1357     # Notify waiting jobs
1358     depmgr.NotifyWaiters(job.id)
1359
1360   elif result == _JobProcessor.DEFER:
1361     # Schedule again
1362     raise workerpool.DeferTask(priority=job.CalcPriority())
1363
1364   elif result == _JobProcessor.WAITDEP:
1365     # No-op, dependency manager will re-schedule
1366     pass
1367
1368   else:
1369     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1370                                  (result, ))
1371
1372
1373 class _JobQueueWorker(workerpool.BaseWorker):
1374   """The actual job workers.
1375
1376   """
1377   def RunTask(self, job): # pylint: disable=W0221
1378     """Job executor.
1379
1380     @type job: L{_QueuedJob}
1381     @param job: the job to be processed
1382
1383     """
1384     assert job.writable, "Expected writable job"
1385
1386     # Ensure only one worker is active on a single job. If a job registers for
1387     # a dependency job, and the other job notifies before the first worker is
1388     # done, the job can end up in the tasklist more than once.
1389     job.processor_lock.acquire()
1390     try:
1391       return self._RunTaskInner(job)
1392     finally:
1393       job.processor_lock.release()
1394
1395   def _RunTaskInner(self, job):
1396     """Executes a job.
1397
1398     Must be called with per-job lock acquired.
1399
1400     """
1401     queue = job.queue
1402     assert queue == self.pool.queue
1403
1404     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1405     setname_fn(None)
1406
1407     proc = mcpu.Processor(queue.context, job.id)
1408
1409     # Create wrapper for setting thread name
1410     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1411                                     proc.ExecOpCode)
1412
1413     _EvaluateJobProcessorResult(queue.depmgr, job,
1414                                 _JobProcessor(queue, wrap_execop_fn, job)())
1415
1416   @staticmethod
1417   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1418     """Updates the worker thread name to include a short summary of the opcode.
1419
1420     @param setname_fn: Callable setting worker thread name
1421     @param execop_fn: Callable for executing opcode (usually
1422                       L{mcpu.Processor.ExecOpCode})
1423
1424     """
1425     setname_fn(op)
1426     try:
1427       return execop_fn(op, *args, **kwargs)
1428     finally:
1429       setname_fn(None)
1430
1431   @staticmethod
1432   def _GetWorkerName(job, op):
1433     """Sets the worker thread name.
1434
1435     @type job: L{_QueuedJob}
1436     @type op: L{opcodes.OpCode}
1437
1438     """
1439     parts = ["Job%s" % job.id]
1440
1441     if op:
1442       parts.append(op.TinySummary())
1443
1444     return "/".join(parts)
1445
1446
1447 class _JobQueueWorkerPool(workerpool.WorkerPool):
1448   """Simple class implementing a job-processing workerpool.
1449
1450   """
1451   def __init__(self, queue):
1452     super(_JobQueueWorkerPool, self).__init__("Jq",
1453                                               JOBQUEUE_THREADS,
1454                                               _JobQueueWorker)
1455     self.queue = queue
1456
1457
1458 class _JobDependencyManager:
1459   """Keeps track of job dependencies.
1460
1461   """
1462   (WAIT,
1463    ERROR,
1464    CANCEL,
1465    CONTINUE,
1466    WRONGSTATUS) = range(1, 6)
1467
1468   def __init__(self, getstatus_fn, enqueue_fn):
1469     """Initializes this class.
1470
1471     """
1472     self._getstatus_fn = getstatus_fn
1473     self._enqueue_fn = enqueue_fn
1474
1475     self._waiters = {}
1476     self._lock = locking.SharedLock("JobDepMgr")
1477
1478   @locking.ssynchronized(_LOCK, shared=1)
1479   def GetLockInfo(self, requested): # pylint: disable=W0613
1480     """Retrieves information about waiting jobs.
1481
1482     @type requested: set
1483     @param requested: Requested information, see C{query.LQ_*}
1484
1485     """
1486     # No need to sort here, that's being done by the lock manager and query
1487     # library. There are no priorities for notifying jobs, hence all show up as
1488     # one item under "pending".
1489     return [("job/%s" % job_id, None, None,
1490              [("job", [job.id for job in waiters])])
1491             for job_id, waiters in self._waiters.items()
1492             if waiters]
1493
1494   @locking.ssynchronized(_LOCK, shared=1)
1495   def JobWaiting(self, job):
1496     """Checks if a job is waiting.
1497
1498     """
1499     return compat.any(job in jobs
1500                       for jobs in self._waiters.values())
1501
1502   @locking.ssynchronized(_LOCK)
1503   def CheckAndRegister(self, job, dep_job_id, dep_status):
1504     """Checks if a dependency job has the requested status.
1505
1506     If the other job is not yet in a finalized status, the calling job will be
1507     notified (re-added to the workerpool) at a later point.
1508
1509     @type job: L{_QueuedJob}
1510     @param job: Job object
1511     @type dep_job_id: int
1512     @param dep_job_id: ID of dependency job
1513     @type dep_status: list
1514     @param dep_status: Required status
1515
1516     """
1517     assert ht.TJobId(job.id)
1518     assert ht.TJobId(dep_job_id)
1519     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1520
1521     if job.id == dep_job_id:
1522       return (self.ERROR, "Job can't depend on itself")
1523
1524     # Get status of dependency job
1525     try:
1526       status = self._getstatus_fn(dep_job_id)
1527     except errors.JobLost, err:
1528       return (self.ERROR, "Dependency error: %s" % err)
1529
1530     assert status in constants.JOB_STATUS_ALL
1531
1532     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1533
1534     if status not in constants.JOBS_FINALIZED:
1535       # Register for notification and wait for job to finish
1536       job_id_waiters.add(job)
1537       return (self.WAIT,
1538               "Need to wait for job %s, wanted status '%s'" %
1539               (dep_job_id, dep_status))
1540
1541     # Remove from waiters list
1542     if job in job_id_waiters:
1543       job_id_waiters.remove(job)
1544
1545     if (status == constants.JOB_STATUS_CANCELED and
1546         constants.JOB_STATUS_CANCELED not in dep_status):
1547       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1548
1549     elif not dep_status or status in dep_status:
1550       return (self.CONTINUE,
1551               "Dependency job %s finished with status '%s'" %
1552               (dep_job_id, status))
1553
1554     else:
1555       return (self.WRONGSTATUS,
1556               "Dependency job %s finished with status '%s',"
1557               " not one of '%s' as required" %
1558               (dep_job_id, status, utils.CommaJoin(dep_status)))
1559
1560   def _RemoveEmptyWaitersUnlocked(self):
1561     """Remove all jobs without actual waiters.
1562
1563     """
1564     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1565                    if not waiters]:
1566       del self._waiters[job_id]
1567
1568   def NotifyWaiters(self, job_id):
1569     """Notifies all jobs waiting for a certain job ID.
1570
1571     @attention: Do not call until L{CheckAndRegister} returned a status other
1572       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1573     @type job_id: int
1574     @param job_id: Job ID
1575
1576     """
1577     assert ht.TJobId(job_id)
1578
1579     self._lock.acquire()
1580     try:
1581       self._RemoveEmptyWaitersUnlocked()
1582
1583       jobs = self._waiters.pop(job_id, None)
1584     finally:
1585       self._lock.release()
1586
1587     if jobs:
1588       # Re-add jobs to workerpool
1589       logging.debug("Re-adding %s jobs which were waiting for job %s",
1590                     len(jobs), job_id)
1591       self._enqueue_fn(jobs)
1592
1593
1594 def _RequireOpenQueue(fn):
1595   """Decorator for "public" functions.
1596
1597   This function should be used for all 'public' functions. That is,
1598   functions usually called from other classes. Note that this should
1599   be applied only to methods (not plain functions), since it expects
1600   that the decorated function is called with a first argument that has
1601   a '_queue_filelock' argument.
1602
1603   @warning: Use this decorator only after locking.ssynchronized
1604
1605   Example::
1606     @locking.ssynchronized(_LOCK)
1607     @_RequireOpenQueue
1608     def Example(self):
1609       pass
1610
1611   """
1612   def wrapper(self, *args, **kwargs):
1613     # pylint: disable=W0212
1614     assert self._queue_filelock is not None, "Queue should be open"
1615     return fn(self, *args, **kwargs)
1616   return wrapper
1617
1618
1619 def _RequireNonDrainedQueue(fn):
1620   """Decorator checking for a non-drained queue.
1621
1622   To be used with functions submitting new jobs.
1623
1624   """
1625   def wrapper(self, *args, **kwargs):
1626     """Wrapper function.
1627
1628     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1629
1630     """
1631     # Ok when sharing the big job queue lock, as the drain file is created when
1632     # the lock is exclusive.
1633     # Needs access to protected member, pylint: disable=W0212
1634     if self._drained:
1635       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1636
1637     if not self._accepting_jobs:
1638       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1639
1640     return fn(self, *args, **kwargs)
1641   return wrapper
1642
1643
1644 class JobQueue(object):
1645   """Queue used to manage the jobs.
1646
1647   """
1648   def __init__(self, context):
1649     """Constructor for JobQueue.
1650
1651     The constructor will initialize the job queue object and then
1652     start loading the current jobs from disk, either for starting them
1653     (if they were queue) or for aborting them (if they were already
1654     running).
1655
1656     @type context: GanetiContext
1657     @param context: the context object for access to the configuration
1658         data and other ganeti objects
1659
1660     """
1661     self.context = context
1662     self._memcache = weakref.WeakValueDictionary()
1663     self._my_hostname = netutils.Hostname.GetSysName()
1664
1665     # The Big JobQueue lock. If a code block or method acquires it in shared
1666     # mode safe it must guarantee concurrency with all the code acquiring it in
1667     # shared mode, including itself. In order not to acquire it at all
1668     # concurrency must be guaranteed with all code acquiring it in shared mode
1669     # and all code acquiring it exclusively.
1670     self._lock = locking.SharedLock("JobQueue")
1671
1672     self.acquire = self._lock.acquire
1673     self.release = self._lock.release
1674
1675     # Accept jobs by default
1676     self._accepting_jobs = True
1677
1678     # Initialize the queue, and acquire the filelock.
1679     # This ensures no other process is working on the job queue.
1680     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1681
1682     # Read serial file
1683     self._last_serial = jstore.ReadSerial()
1684     assert self._last_serial is not None, ("Serial file was modified between"
1685                                            " check in jstore and here")
1686
1687     # Get initial list of nodes
1688     self._nodes = dict((n.name, n.primary_ip)
1689                        for n in self.context.cfg.GetAllNodesInfo().values()
1690                        if n.master_candidate)
1691
1692     # Remove master node
1693     self._nodes.pop(self._my_hostname, None)
1694
1695     # TODO: Check consistency across nodes
1696
1697     self._queue_size = None
1698     self._UpdateQueueSizeUnlocked()
1699     assert ht.TInt(self._queue_size)
1700     self._drained = jstore.CheckDrainFlag()
1701
1702     # Job dependencies
1703     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1704                                         self._EnqueueJobs)
1705     self.context.glm.AddToLockMonitor(self.depmgr)
1706
1707     # Setup worker pool
1708     self._wpool = _JobQueueWorkerPool(self)
1709     try:
1710       self._InspectQueue()
1711     except:
1712       self._wpool.TerminateWorkers()
1713       raise
1714
1715   @locking.ssynchronized(_LOCK)
1716   @_RequireOpenQueue
1717   def _InspectQueue(self):
1718     """Loads the whole job queue and resumes unfinished jobs.
1719
1720     This function needs the lock here because WorkerPool.AddTask() may start a
1721     job while we're still doing our work.
1722
1723     """
1724     logging.info("Inspecting job queue")
1725
1726     restartjobs = []
1727
1728     all_job_ids = self._GetJobIDsUnlocked()
1729     jobs_count = len(all_job_ids)
1730     lastinfo = time.time()
1731     for idx, job_id in enumerate(all_job_ids):
1732       # Give an update every 1000 jobs or 10 seconds
1733       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1734           idx == (jobs_count - 1)):
1735         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1736                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1737         lastinfo = time.time()
1738
1739       job = self._LoadJobUnlocked(job_id)
1740
1741       # a failure in loading the job can cause 'None' to be returned
1742       if job is None:
1743         continue
1744
1745       status = job.CalcStatus()
1746
1747       if status == constants.JOB_STATUS_QUEUED:
1748         restartjobs.append(job)
1749
1750       elif status in (constants.JOB_STATUS_RUNNING,
1751                       constants.JOB_STATUS_WAITING,
1752                       constants.JOB_STATUS_CANCELING):
1753         logging.warning("Unfinished job %s found: %s", job.id, job)
1754
1755         if status == constants.JOB_STATUS_WAITING:
1756           # Restart job
1757           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1758           restartjobs.append(job)
1759         else:
1760           to_encode = errors.OpExecError("Unclean master daemon shutdown")
1761           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1762                                 _EncodeOpError(to_encode))
1763           job.Finalize()
1764
1765         self.UpdateJobUnlocked(job)
1766
1767     if restartjobs:
1768       logging.info("Restarting %s jobs", len(restartjobs))
1769       self._EnqueueJobsUnlocked(restartjobs)
1770
1771     logging.info("Job queue inspection finished")
1772
1773   def _GetRpc(self, address_list):
1774     """Gets RPC runner with context.
1775
1776     """
1777     return rpc.JobQueueRunner(self.context, address_list)
1778
1779   @locking.ssynchronized(_LOCK)
1780   @_RequireOpenQueue
1781   def AddNode(self, node):
1782     """Register a new node with the queue.
1783
1784     @type node: L{objects.Node}
1785     @param node: the node object to be added
1786
1787     """
1788     node_name = node.name
1789     assert node_name != self._my_hostname
1790
1791     # Clean queue directory on added node
1792     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1793     msg = result.fail_msg
1794     if msg:
1795       logging.warning("Cannot cleanup queue directory on node %s: %s",
1796                       node_name, msg)
1797
1798     if not node.master_candidate:
1799       # remove if existing, ignoring errors
1800       self._nodes.pop(node_name, None)
1801       # and skip the replication of the job ids
1802       return
1803
1804     # Upload the whole queue excluding archived jobs
1805     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1806
1807     # Upload current serial file
1808     files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1809
1810     # Static address list
1811     addrs = [node.primary_ip]
1812
1813     for file_name in files:
1814       # Read file content
1815       content = utils.ReadFile(file_name)
1816
1817       result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1818                              file_name, content)
1819       msg = result[node_name].fail_msg
1820       if msg:
1821         logging.error("Failed to upload file %s to node %s: %s",
1822                       file_name, node_name, msg)
1823
1824     # Set queue drained flag
1825     result = \
1826       self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1827                                                        self._drained)
1828     msg = result[node_name].fail_msg
1829     if msg:
1830       logging.error("Failed to set queue drained flag on node %s: %s",
1831                     node_name, msg)
1832
1833     self._nodes[node_name] = node.primary_ip
1834
1835   @locking.ssynchronized(_LOCK)
1836   @_RequireOpenQueue
1837   def RemoveNode(self, node_name):
1838     """Callback called when removing nodes from the cluster.
1839
1840     @type node_name: str
1841     @param node_name: the name of the node to remove
1842
1843     """
1844     self._nodes.pop(node_name, None)
1845
1846   @staticmethod
1847   def _CheckRpcResult(result, nodes, failmsg):
1848     """Verifies the status of an RPC call.
1849
1850     Since we aim to keep consistency should this node (the current
1851     master) fail, we will log errors if our rpc fail, and especially
1852     log the case when more than half of the nodes fails.
1853
1854     @param result: the data as returned from the rpc call
1855     @type nodes: list
1856     @param nodes: the list of nodes we made the call to
1857     @type failmsg: str
1858     @param failmsg: the identifier to be used for logging
1859
1860     """
1861     failed = []
1862     success = []
1863
1864     for node in nodes:
1865       msg = result[node].fail_msg
1866       if msg:
1867         failed.append(node)
1868         logging.error("RPC call %s (%s) failed on node %s: %s",
1869                       result[node].call, failmsg, node, msg)
1870       else:
1871         success.append(node)
1872
1873     # +1 for the master node
1874     if (len(success) + 1) < len(failed):
1875       # TODO: Handle failing nodes
1876       logging.error("More than half of the nodes failed")
1877
1878   def _GetNodeIp(self):
1879     """Helper for returning the node name/ip list.
1880
1881     @rtype: (list, list)
1882     @return: a tuple of two lists, the first one with the node
1883         names and the second one with the node addresses
1884
1885     """
1886     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1887     name_list = self._nodes.keys()
1888     addr_list = [self._nodes[name] for name in name_list]
1889     return name_list, addr_list
1890
1891   def _UpdateJobQueueFile(self, file_name, data, replicate):
1892     """Writes a file locally and then replicates it to all nodes.
1893
1894     This function will replace the contents of a file on the local
1895     node and then replicate it to all the other nodes we have.
1896
1897     @type file_name: str
1898     @param file_name: the path of the file to be replicated
1899     @type data: str
1900     @param data: the new contents of the file
1901     @type replicate: boolean
1902     @param replicate: whether to spread the changes to the remote nodes
1903
1904     """
1905     getents = runtime.GetEnts()
1906     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1907                     gid=getents.daemons_gid,
1908                     mode=constants.JOB_QUEUE_FILES_PERMS)
1909
1910     if replicate:
1911       names, addrs = self._GetNodeIp()
1912       result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1913       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1914
1915   def _RenameFilesUnlocked(self, rename):
1916     """Renames a file locally and then replicate the change.
1917
1918     This function will rename a file in the local queue directory
1919     and then replicate this rename to all the other nodes we have.
1920
1921     @type rename: list of (old, new)
1922     @param rename: List containing tuples mapping old to new names
1923
1924     """
1925     # Rename them locally
1926     for old, new in rename:
1927       utils.RenameFile(old, new, mkdir=True)
1928
1929     # ... and on all nodes
1930     names, addrs = self._GetNodeIp()
1931     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1932     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1933
1934   def _NewSerialsUnlocked(self, count):
1935     """Generates a new job identifier.
1936
1937     Job identifiers are unique during the lifetime of a cluster.
1938
1939     @type count: integer
1940     @param count: how many serials to return
1941     @rtype: list of int
1942     @return: a list of job identifiers.
1943
1944     """
1945     assert ht.TNonNegativeInt(count)
1946
1947     # New number
1948     serial = self._last_serial + count
1949
1950     # Write to file
1951     self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1952                              "%s\n" % serial, True)
1953
1954     result = [jstore.FormatJobID(v)
1955               for v in range(self._last_serial + 1, serial + 1)]
1956
1957     # Keep it only if we were able to write the file
1958     self._last_serial = serial
1959
1960     assert len(result) == count
1961
1962     return result
1963
1964   @staticmethod
1965   def _GetJobPath(job_id):
1966     """Returns the job file for a given job id.
1967
1968     @type job_id: str
1969     @param job_id: the job identifier
1970     @rtype: str
1971     @return: the path to the job file
1972
1973     """
1974     return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1975
1976   @staticmethod
1977   def _GetArchivedJobPath(job_id):
1978     """Returns the archived job file for a give job id.
1979
1980     @type job_id: str
1981     @param job_id: the job identifier
1982     @rtype: str
1983     @return: the path to the archived job file
1984
1985     """
1986     return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1987                           jstore.GetArchiveDirectory(job_id),
1988                           "job-%s" % job_id)
1989
1990   @staticmethod
1991   def _DetermineJobDirectories(archived):
1992     """Build list of directories containing job files.
1993
1994     @type archived: bool
1995     @param archived: Whether to include directories for archived jobs
1996     @rtype: list
1997
1998     """
1999     result = [pathutils.QUEUE_DIR]
2000
2001     if archived:
2002       archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2003       result.extend(map(compat.partial(utils.PathJoin, archive_path),
2004                         utils.ListVisibleFiles(archive_path)))
2005
2006     return result
2007
2008   @classmethod
2009   def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2010     """Return all known job IDs.
2011
2012     The method only looks at disk because it's a requirement that all
2013     jobs are present on disk (so in the _memcache we don't have any
2014     extra IDs).
2015
2016     @type sort: boolean
2017     @param sort: perform sorting on the returned job ids
2018     @rtype: list
2019     @return: the list of job IDs
2020
2021     """
2022     jlist = []
2023
2024     for path in cls._DetermineJobDirectories(archived):
2025       for filename in utils.ListVisibleFiles(path):
2026         m = constants.JOB_FILE_RE.match(filename)
2027         if m:
2028           jlist.append(int(m.group(1)))
2029
2030     if sort:
2031       jlist.sort()
2032     return jlist
2033
2034   def _LoadJobUnlocked(self, job_id):
2035     """Loads a job from the disk or memory.
2036
2037     Given a job id, this will return the cached job object if
2038     existing, or try to load the job from the disk. If loading from
2039     disk, it will also add the job to the cache.
2040
2041     @type job_id: int
2042     @param job_id: the job id
2043     @rtype: L{_QueuedJob} or None
2044     @return: either None or the job object
2045
2046     """
2047     assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
2048
2049     job = self._memcache.get(job_id, None)
2050     if job:
2051       logging.debug("Found job %s in memcache", job_id)
2052       assert job.writable, "Found read-only job in memcache"
2053       return job
2054
2055     try:
2056       job = self._LoadJobFromDisk(job_id, False)
2057       if job is None:
2058         return job
2059     except errors.JobFileCorrupted:
2060       old_path = self._GetJobPath(job_id)
2061       new_path = self._GetArchivedJobPath(job_id)
2062       if old_path == new_path:
2063         # job already archived (future case)
2064         logging.exception("Can't parse job %s", job_id)
2065       else:
2066         # non-archived case
2067         logging.exception("Can't parse job %s, will archive.", job_id)
2068         self._RenameFilesUnlocked([(old_path, new_path)])
2069       return None
2070
2071     assert job.writable, "Job just loaded is not writable"
2072
2073     self._memcache[job_id] = job
2074     logging.debug("Added job %s to the cache", job_id)
2075     return job
2076
2077   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2078     """Load the given job file from disk.
2079
2080     Given a job file, read, load and restore it in a _QueuedJob format.
2081
2082     @type job_id: int
2083     @param job_id: job identifier
2084     @type try_archived: bool
2085     @param try_archived: Whether to try loading an archived job
2086     @rtype: L{_QueuedJob} or None
2087     @return: either None or the job object
2088
2089     """
2090     path_functions = [(self._GetJobPath, False)]
2091
2092     if try_archived:
2093       path_functions.append((self._GetArchivedJobPath, True))
2094
2095     raw_data = None
2096     archived = None
2097
2098     for (fn, archived) in path_functions:
2099       filepath = fn(job_id)
2100       logging.debug("Loading job from %s", filepath)
2101       try:
2102         raw_data = utils.ReadFile(filepath)
2103       except EnvironmentError, err:
2104         if err.errno != errno.ENOENT:
2105           raise
2106       else:
2107         break
2108
2109     if not raw_data:
2110       return None
2111
2112     if writable is None:
2113       writable = not archived
2114
2115     try:
2116       data = serializer.LoadJson(raw_data)
2117       job = _QueuedJob.Restore(self, data, writable, archived)
2118     except Exception, err: # pylint: disable=W0703
2119       raise errors.JobFileCorrupted(err)
2120
2121     return job
2122
2123   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2124     """Load the given job file from disk.
2125
2126     Given a job file, read, load and restore it in a _QueuedJob format.
2127     In case of error reading the job, it gets returned as None, and the
2128     exception is logged.
2129
2130     @type job_id: int
2131     @param job_id: job identifier
2132     @type try_archived: bool
2133     @param try_archived: Whether to try loading an archived job
2134     @rtype: L{_QueuedJob} or None
2135     @return: either None or the job object
2136
2137     """
2138     try:
2139       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2140     except (errors.JobFileCorrupted, EnvironmentError):
2141       logging.exception("Can't load/parse job %s", job_id)
2142       return None
2143
2144   def _UpdateQueueSizeUnlocked(self):
2145     """Update the queue size.
2146
2147     """
2148     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2149
2150   @locking.ssynchronized(_LOCK)
2151   @_RequireOpenQueue
2152   def SetDrainFlag(self, drain_flag):
2153     """Sets the drain flag for the queue.
2154
2155     @type drain_flag: boolean
2156     @param drain_flag: Whether to set or unset the drain flag
2157
2158     """
2159     # Change flag locally
2160     jstore.SetDrainFlag(drain_flag)
2161
2162     self._drained = drain_flag
2163
2164     # ... and on all nodes
2165     (names, addrs) = self._GetNodeIp()
2166     result = \
2167       self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2168     self._CheckRpcResult(result, self._nodes,
2169                          "Setting queue drain flag to %s" % drain_flag)
2170
2171     return True
2172
2173   @_RequireOpenQueue
2174   def _SubmitJobUnlocked(self, job_id, ops):
2175     """Create and store a new job.
2176
2177     This enters the job into our job queue and also puts it on the new
2178     queue, in order for it to be picked up by the queue processors.
2179
2180     @type job_id: job ID
2181     @param job_id: the job ID for the new job
2182     @type ops: list
2183     @param ops: The list of OpCodes that will become the new job.
2184     @rtype: L{_QueuedJob}
2185     @return: the job object to be queued
2186     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2187     @raise errors.GenericError: If an opcode is not valid
2188
2189     """
2190     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2191       raise errors.JobQueueFull()
2192
2193     job = _QueuedJob(self, job_id, ops, True)
2194
2195     for idx, op in enumerate(job.ops):
2196       # Check priority
2197       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2198         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2199         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2200                                   " are %s" % (idx, op.priority, allowed))
2201
2202       # Check job dependencies
2203       dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None)
2204       if not opcodes_base.TNoRelativeJobDependencies(dependencies):
2205         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2206                                   " match %s: %s" %
2207                                   (idx, opcodes_base.TNoRelativeJobDependencies,
2208                                    dependencies))
2209
2210     # Write to disk
2211     self.UpdateJobUnlocked(job)
2212
2213     self._queue_size += 1
2214
2215     logging.debug("Adding new job %s to the cache", job_id)
2216     self._memcache[job_id] = job
2217
2218     return job
2219
2220   @locking.ssynchronized(_LOCK)
2221   @_RequireOpenQueue
2222   @_RequireNonDrainedQueue
2223   def SubmitJob(self, ops):
2224     """Create and store a new job.
2225
2226     @see: L{_SubmitJobUnlocked}
2227
2228     """
2229     (job_id, ) = self._NewSerialsUnlocked(1)
2230     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2231     return job_id
2232
2233   @locking.ssynchronized(_LOCK)
2234   @_RequireOpenQueue
2235   def SubmitJobToDrainedQueue(self, ops):
2236     """Forcefully create and store a new job.
2237
2238     Do so, even if the job queue is drained.
2239     @see: L{_SubmitJobUnlocked}
2240
2241     """
2242     (job_id, ) = self._NewSerialsUnlocked(1)
2243     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2244     return job_id
2245
2246   @locking.ssynchronized(_LOCK)
2247   @_RequireOpenQueue
2248   @_RequireNonDrainedQueue
2249   def SubmitManyJobs(self, jobs):
2250     """Create and store multiple jobs.
2251
2252     @see: L{_SubmitJobUnlocked}
2253
2254     """
2255     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2256
2257     (results, added_jobs) = \
2258       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2259
2260     self._EnqueueJobsUnlocked(added_jobs)
2261
2262     return results
2263
2264   @staticmethod
2265   def _FormatSubmitError(msg, ops):
2266     """Formats errors which occurred while submitting a job.
2267
2268     """
2269     return ("%s; opcodes %s" %
2270             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2271
2272   @staticmethod
2273   def _ResolveJobDependencies(resolve_fn, deps):
2274     """Resolves relative job IDs in dependencies.
2275
2276     @type resolve_fn: callable
2277     @param resolve_fn: Function to resolve a relative job ID
2278     @type deps: list
2279     @param deps: Dependencies
2280     @rtype: tuple; (boolean, string or list)
2281     @return: If successful (first tuple item), the returned list contains
2282       resolved job IDs along with the requested status; if not successful,
2283       the second element is an error message
2284
2285     """
2286     result = []
2287
2288     for (dep_job_id, dep_status) in deps:
2289       if ht.TRelativeJobId(dep_job_id):
2290         assert ht.TInt(dep_job_id) and dep_job_id < 0
2291         try:
2292           job_id = resolve_fn(dep_job_id)
2293         except IndexError:
2294           # Abort
2295           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2296       else:
2297         job_id = dep_job_id
2298
2299       result.append((job_id, dep_status))
2300
2301     return (True, result)
2302
2303   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2304     """Create and store multiple jobs.
2305
2306     @see: L{_SubmitJobUnlocked}
2307
2308     """
2309     results = []
2310     added_jobs = []
2311
2312     def resolve_fn(job_idx, reljobid):
2313       assert reljobid < 0
2314       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2315
2316     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2317       for op in ops:
2318         if getattr(op, opcodes_base.DEPEND_ATTR, None):
2319           (status, data) = \
2320             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2321                                          op.depends)
2322           if not status:
2323             # Abort resolving dependencies
2324             assert ht.TNonEmptyString(data), "No error message"
2325             break
2326           # Use resolved dependencies
2327           op.depends = data
2328       else:
2329         try:
2330           job = self._SubmitJobUnlocked(job_id, ops)
2331         except errors.GenericError, err:
2332           status = False
2333           data = self._FormatSubmitError(str(err), ops)
2334         else:
2335           status = True
2336           data = job_id
2337           added_jobs.append(job)
2338
2339       results.append((status, data))
2340
2341     return (results, added_jobs)
2342
2343   @locking.ssynchronized(_LOCK)
2344   def _EnqueueJobs(self, jobs):
2345     """Helper function to add jobs to worker pool's queue.
2346
2347     @type jobs: list
2348     @param jobs: List of all jobs
2349
2350     """
2351     return self._EnqueueJobsUnlocked(jobs)
2352
2353   def _EnqueueJobsUnlocked(self, jobs):
2354     """Helper function to add jobs to worker pool's queue.
2355
2356     @type jobs: list
2357     @param jobs: List of all jobs
2358
2359     """
2360     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2361     self._wpool.AddManyTasks([(job, ) for job in jobs],
2362                              priority=[job.CalcPriority() for job in jobs],
2363                              task_id=map(_GetIdAttr, jobs))
2364
2365   def _GetJobStatusForDependencies(self, job_id):
2366     """Gets the status of a job for dependencies.
2367
2368     @type job_id: int
2369     @param job_id: Job ID
2370     @raise errors.JobLost: If job can't be found
2371
2372     """
2373     # Not using in-memory cache as doing so would require an exclusive lock
2374
2375     # Try to load from disk
2376     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2377
2378     assert not job.writable, "Got writable job" # pylint: disable=E1101
2379
2380     if job:
2381       return job.CalcStatus()
2382
2383     raise errors.JobLost("Job %s not found" % job_id)
2384
2385   @_RequireOpenQueue
2386   def UpdateJobUnlocked(self, job, replicate=True):
2387     """Update a job's on disk storage.
2388
2389     After a job has been modified, this function needs to be called in
2390     order to write the changes to disk and replicate them to the other
2391     nodes.
2392
2393     @type job: L{_QueuedJob}
2394     @param job: the changed job
2395     @type replicate: boolean
2396     @param replicate: whether to replicate the change to remote nodes
2397
2398     """
2399     if __debug__:
2400       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2401       assert (finalized ^ (job.end_timestamp is None))
2402       assert job.writable, "Can't update read-only job"
2403       assert not job.archived, "Can't update archived job"
2404
2405     filename = self._GetJobPath(job.id)
2406     data = serializer.DumpJson(job.Serialize())
2407     logging.debug("Writing job %s to %s", job.id, filename)
2408     self._UpdateJobQueueFile(filename, data, replicate)
2409
2410   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2411                         timeout):
2412     """Waits for changes in a job.
2413
2414     @type job_id: int
2415     @param job_id: Job identifier
2416     @type fields: list of strings
2417     @param fields: Which fields to check for changes
2418     @type prev_job_info: list or None
2419     @param prev_job_info: Last job information returned
2420     @type prev_log_serial: int
2421     @param prev_log_serial: Last job message serial number
2422     @type timeout: float
2423     @param timeout: maximum time to wait in seconds
2424     @rtype: tuple (job info, log entries)
2425     @return: a tuple of the job information as required via
2426         the fields parameter, and the log entries as a list
2427
2428         if the job has not changed and the timeout has expired,
2429         we instead return a special value,
2430         L{constants.JOB_NOTCHANGED}, which should be interpreted
2431         as such by the clients
2432
2433     """
2434     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2435                              writable=False)
2436
2437     helper = _WaitForJobChangesHelper()
2438
2439     return helper(self._GetJobPath(job_id), load_fn,
2440                   fields, prev_job_info, prev_log_serial, timeout)
2441
2442   @locking.ssynchronized(_LOCK)
2443   @_RequireOpenQueue
2444   def CancelJob(self, job_id):
2445     """Cancels a job.
2446
2447     This will only succeed if the job has not started yet.
2448
2449     @type job_id: int
2450     @param job_id: job ID of job to be cancelled.
2451
2452     """
2453     logging.info("Cancelling job %s", job_id)
2454
2455     return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2456
2457   @locking.ssynchronized(_LOCK)
2458   @_RequireOpenQueue
2459   def ChangeJobPriority(self, job_id, priority):
2460     """Changes a job's priority.
2461
2462     @type job_id: int
2463     @param job_id: ID of the job whose priority should be changed
2464     @type priority: int
2465     @param priority: New priority
2466
2467     """
2468     logging.info("Changing priority of job %s to %s", job_id, priority)
2469
2470     if priority not in constants.OP_PRIO_SUBMIT_VALID:
2471       allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2472       raise errors.GenericError("Invalid priority %s, allowed are %s" %
2473                                 (priority, allowed))
2474
2475     def fn(job):
2476       (success, msg) = job.ChangePriority(priority)
2477
2478       if success:
2479         try:
2480           self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2481         except workerpool.NoSuchTask:
2482           logging.debug("Job %s is not in workerpool at this time", job.id)
2483
2484       return (success, msg)
2485
2486     return self._ModifyJobUnlocked(job_id, fn)
2487
2488   def _ModifyJobUnlocked(self, job_id, mod_fn):
2489     """Modifies a job.
2490
2491     @type job_id: int
2492     @param job_id: Job ID
2493     @type mod_fn: callable
2494     @param mod_fn: Modifying function, receiving job object as parameter,
2495       returning tuple of (status boolean, message string)
2496
2497     """
2498     job = self._LoadJobUnlocked(job_id)
2499     if not job:
2500       logging.debug("Job %s not found", job_id)
2501       return (False, "Job %s not found" % job_id)
2502
2503     assert job.writable, "Can't modify read-only job"
2504     assert not job.archived, "Can't modify archived job"
2505
2506     (success, msg) = mod_fn(job)
2507
2508     if success:
2509       # If the job was finalized (e.g. cancelled), this is the final write
2510       # allowed. The job can be archived anytime.
2511       self.UpdateJobUnlocked(job)
2512
2513     return (success, msg)
2514
2515   @_RequireOpenQueue
2516   def _ArchiveJobsUnlocked(self, jobs):
2517     """Archives jobs.
2518
2519     @type jobs: list of L{_QueuedJob}
2520     @param jobs: Job objects
2521     @rtype: int
2522     @return: Number of archived jobs
2523
2524     """
2525     archive_jobs = []
2526     rename_files = []
2527     for job in jobs:
2528       assert job.writable, "Can't archive read-only job"
2529       assert not job.archived, "Can't cancel archived job"
2530
2531       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2532         logging.debug("Job %s is not yet done", job.id)
2533         continue
2534
2535       archive_jobs.append(job)
2536
2537       old = self._GetJobPath(job.id)
2538       new = self._GetArchivedJobPath(job.id)
2539       rename_files.append((old, new))
2540
2541     # TODO: What if 1..n files fail to rename?
2542     self._RenameFilesUnlocked(rename_files)
2543
2544     logging.debug("Successfully archived job(s) %s",
2545                   utils.CommaJoin(job.id for job in archive_jobs))
2546
2547     # Since we haven't quite checked, above, if we succeeded or failed renaming
2548     # the files, we update the cached queue size from the filesystem. When we
2549     # get around to fix the TODO: above, we can use the number of actually
2550     # archived jobs to fix this.
2551     self._UpdateQueueSizeUnlocked()
2552     return len(archive_jobs)
2553
2554   @locking.ssynchronized(_LOCK)
2555   @_RequireOpenQueue
2556   def ArchiveJob(self, job_id):
2557     """Archives a job.
2558
2559     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2560
2561     @type job_id: int
2562     @param job_id: Job ID of job to be archived.
2563     @rtype: bool
2564     @return: Whether job was archived
2565
2566     """
2567     logging.info("Archiving job %s", job_id)
2568
2569     job = self._LoadJobUnlocked(job_id)
2570     if not job:
2571       logging.debug("Job %s not found", job_id)
2572       return False
2573
2574     return self._ArchiveJobsUnlocked([job]) == 1
2575
2576   @locking.ssynchronized(_LOCK)
2577   @_RequireOpenQueue
2578   def AutoArchiveJobs(self, age, timeout):
2579     """Archives all jobs based on age.
2580
2581     The method will archive all jobs which are older than the age
2582     parameter. For jobs that don't have an end timestamp, the start
2583     timestamp will be considered. The special '-1' age will cause
2584     archival of all jobs (that are not running or queued).
2585
2586     @type age: int
2587     @param age: the minimum age in seconds
2588
2589     """
2590     logging.info("Archiving jobs with age more than %s seconds", age)
2591
2592     now = time.time()
2593     end_time = now + timeout
2594     archived_count = 0
2595     last_touched = 0
2596
2597     all_job_ids = self._GetJobIDsUnlocked()
2598     pending = []
2599     for idx, job_id in enumerate(all_job_ids):
2600       last_touched = idx + 1
2601
2602       # Not optimal because jobs could be pending
2603       # TODO: Measure average duration for job archival and take number of
2604       # pending jobs into account.
2605       if time.time() > end_time:
2606         break
2607
2608       # Returns None if the job failed to load
2609       job = self._LoadJobUnlocked(job_id)
2610       if job:
2611         if job.end_timestamp is None:
2612           if job.start_timestamp is None:
2613             job_age = job.received_timestamp
2614           else:
2615             job_age = job.start_timestamp
2616         else:
2617           job_age = job.end_timestamp
2618
2619         if age == -1 or now - job_age[0] > age:
2620           pending.append(job)
2621
2622           # Archive 10 jobs at a time
2623           if len(pending) >= 10:
2624             archived_count += self._ArchiveJobsUnlocked(pending)
2625             pending = []
2626
2627     if pending:
2628       archived_count += self._ArchiveJobsUnlocked(pending)
2629
2630     return (archived_count, len(all_job_ids) - last_touched)
2631
2632   def _Query(self, fields, qfilter):
2633     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2634                        namefield="id")
2635
2636     # Archived jobs are only looked at if the "archived" field is referenced
2637     # either as a requested field or in the filter. By default archived jobs
2638     # are ignored.
2639     include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2640
2641     job_ids = qobj.RequestedNames()
2642
2643     list_all = (job_ids is None)
2644
2645     if list_all:
2646       # Since files are added to/removed from the queue atomically, there's no
2647       # risk of getting the job ids in an inconsistent state.
2648       job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2649
2650     jobs = []
2651
2652     for job_id in job_ids:
2653       job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2654       if job is not None or not list_all:
2655         jobs.append((job_id, job))
2656
2657     return (qobj, jobs, list_all)
2658
2659   def QueryJobs(self, fields, qfilter):
2660     """Returns a list of jobs in queue.
2661
2662     @type fields: sequence
2663     @param fields: List of wanted fields
2664     @type qfilter: None or query2 filter (list)
2665     @param qfilter: Query filter
2666
2667     """
2668     (qobj, ctx, _) = self._Query(fields, qfilter)
2669
2670     return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2671
2672   def OldStyleQueryJobs(self, job_ids, fields):
2673     """Returns a list of jobs in queue.
2674
2675     @type job_ids: list
2676     @param job_ids: sequence of job identifiers or None for all
2677     @type fields: list
2678     @param fields: names of fields to return
2679     @rtype: list
2680     @return: list one element per job, each element being list with
2681         the requested fields
2682
2683     """
2684     # backwards compat:
2685     job_ids = [int(jid) for jid in job_ids]
2686     qfilter = qlang.MakeSimpleFilter("id", job_ids)
2687
2688     (qobj, ctx, _) = self._Query(fields, qfilter)
2689
2690     return qobj.OldStyleQuery(ctx, sort_by_name=False)
2691
2692   @locking.ssynchronized(_LOCK)
2693   def PrepareShutdown(self):
2694     """Prepare to stop the job queue.
2695
2696     Disables execution of jobs in the workerpool and returns whether there are
2697     any jobs currently running. If the latter is the case, the job queue is not
2698     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2699     be called without interfering with any job. Queued and unfinished jobs will
2700     be resumed next time.
2701
2702     Once this function has been called no new job submissions will be accepted
2703     (see L{_RequireNonDrainedQueue}).
2704
2705     @rtype: bool
2706     @return: Whether there are any running jobs
2707
2708     """
2709     if self._accepting_jobs:
2710       self._accepting_jobs = False
2711
2712       # Tell worker pool to stop processing pending tasks
2713       self._wpool.SetActive(False)
2714
2715     return self._wpool.HasRunningTasks()
2716
2717   def AcceptingJobsUnlocked(self):
2718     """Returns whether jobs are accepted.
2719
2720     Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2721     queue is shutting down.
2722
2723     @rtype: bool
2724
2725     """
2726     return self._accepting_jobs
2727
2728   @locking.ssynchronized(_LOCK)
2729   @_RequireOpenQueue
2730   def Shutdown(self):
2731     """Stops the job queue.
2732
2733     This shutdowns all the worker threads an closes the queue.
2734
2735     """
2736     self._wpool.TerminateWorkers()
2737
2738     self._queue_filelock.Close()
2739     self._queue_filelock = None