Add command-line support for multiple specs in ipolicy
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38 import operator
39
40 try:
41   # pylint: disable=E0611
42   from pyinotify import pyinotify
43 except ImportError:
44   import pyinotify
45
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import errors
53 from ganeti import mcpu
54 from ganeti import utils
55 from ganeti import jstore
56 from ganeti import rpc
57 from ganeti import runtime
58 from ganeti import netutils
59 from ganeti import compat
60 from ganeti import ht
61 from ganeti import query
62 from ganeti import qlang
63 from ganeti import pathutils
64 from ganeti import vcluster
65
66
67 JOBQUEUE_THREADS = 25
68
69 # member lock names to be passed to @ssynchronized decorator
70 _LOCK = "_lock"
71 _QUEUE = "_queue"
72
73 #: Retrieves "id" attribute
74 _GetIdAttr = operator.attrgetter("id")
75
76
77 class CancelJob(Exception):
78   """Special exception to cancel a job.
79
80   """
81
82
83 class QueueShutdown(Exception):
84   """Special exception to abort a job when the job queue is shutting down.
85
86   """
87
88
89 def TimeStampNow():
90   """Returns the current timestamp.
91
92   @rtype: tuple
93   @return: the current time in the (seconds, microseconds) format
94
95   """
96   return utils.SplitTime(time.time())
97
98
99 def _CallJqUpdate(runner, names, file_name, content):
100   """Updates job queue file after virtualizing filename.
101
102   """
103   virt_file_name = vcluster.MakeVirtualPath(file_name)
104   return runner.call_jobqueue_update(names, virt_file_name, content)
105
106
107 class _SimpleJobQuery:
108   """Wrapper for job queries.
109
110   Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111
112   """
113   def __init__(self, fields):
114     """Initializes this class.
115
116     """
117     self._query = query.Query(query.JOB_FIELDS, fields)
118
119   def __call__(self, job):
120     """Executes a job query using cached field list.
121
122     """
123     return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124
125
126 class _QueuedOpCode(object):
127   """Encapsulates an opcode object.
128
129   @ivar log: holds the execution log and consists of tuples
130   of the form C{(log_serial, timestamp, level, message)}
131   @ivar input: the OpCode we encapsulate
132   @ivar status: the current status
133   @ivar result: the result of the LU execution
134   @ivar start_timestamp: timestamp for the start of the execution
135   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136   @ivar stop_timestamp: timestamp for the end of the execution
137
138   """
139   __slots__ = ["input", "status", "result", "log", "priority",
140                "start_timestamp", "exec_timestamp", "end_timestamp",
141                "__weakref__"]
142
143   def __init__(self, op):
144     """Initializes instances of this class.
145
146     @type op: L{opcodes.OpCode}
147     @param op: the opcode we encapsulate
148
149     """
150     self.input = op
151     self.status = constants.OP_STATUS_QUEUED
152     self.result = None
153     self.log = []
154     self.start_timestamp = None
155     self.exec_timestamp = None
156     self.end_timestamp = None
157
158     # Get initial priority (it might change during the lifetime of this opcode)
159     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160
161   @classmethod
162   def Restore(cls, state):
163     """Restore the _QueuedOpCode from the serialized form.
164
165     @type state: dict
166     @param state: the serialized state
167     @rtype: _QueuedOpCode
168     @return: a new _QueuedOpCode instance
169
170     """
171     obj = _QueuedOpCode.__new__(cls)
172     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173     obj.status = state["status"]
174     obj.result = state["result"]
175     obj.log = state["log"]
176     obj.start_timestamp = state.get("start_timestamp", None)
177     obj.exec_timestamp = state.get("exec_timestamp", None)
178     obj.end_timestamp = state.get("end_timestamp", None)
179     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180     return obj
181
182   def Serialize(self):
183     """Serializes this _QueuedOpCode.
184
185     @rtype: dict
186     @return: the dictionary holding the serialized state
187
188     """
189     return {
190       "input": self.input.__getstate__(),
191       "status": self.status,
192       "result": self.result,
193       "log": self.log,
194       "start_timestamp": self.start_timestamp,
195       "exec_timestamp": self.exec_timestamp,
196       "end_timestamp": self.end_timestamp,
197       "priority": self.priority,
198       }
199
200
201 class _QueuedJob(object):
202   """In-memory job representation.
203
204   This is what we use to track the user-submitted jobs. Locking must
205   be taken care of by users of this class.
206
207   @type queue: L{JobQueue}
208   @ivar queue: the parent queue
209   @ivar id: the job ID
210   @type ops: list
211   @ivar ops: the list of _QueuedOpCode that constitute the job
212   @type log_serial: int
213   @ivar log_serial: holds the index for the next log entry
214   @ivar received_timestamp: the timestamp for when the job was received
215   @ivar start_timestmap: the timestamp for start of execution
216   @ivar end_timestamp: the timestamp for end of execution
217   @ivar writable: Whether the job is allowed to be modified
218
219   """
220   # pylint: disable=W0212
221   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222                "received_timestamp", "start_timestamp", "end_timestamp",
223                "__weakref__", "processor_lock", "writable", "archived"]
224
225   def _AddReasons(self):
226     """Extend the reason trail
227
228     Add the reason for all the opcodes of this job to be executed.
229
230     """
231     count = 0
232     for queued_op in self.ops:
233       op = queued_op.input
234       reason_src = opcodes.NameToReasonSrc(op.__class__.__name__)
235       reason_text = "job=%d;index=%d" % (self.id, count)
236       reason = getattr(op, "reason", [])
237       reason.append((reason_src, reason_text, utils.EpochNano()))
238       op.reason = reason
239       count = count + 1
240
241   def __init__(self, queue, job_id, ops, writable):
242     """Constructor for the _QueuedJob.
243
244     @type queue: L{JobQueue}
245     @param queue: our parent queue
246     @type job_id: job_id
247     @param job_id: our job id
248     @type ops: list
249     @param ops: the list of opcodes we hold, which will be encapsulated
250         in _QueuedOpCodes
251     @type writable: bool
252     @param writable: Whether job can be modified
253
254     """
255     if not ops:
256       raise errors.GenericError("A job needs at least one opcode")
257
258     self.queue = queue
259     self.id = int(job_id)
260     self.ops = [_QueuedOpCode(op) for op in ops]
261     self._AddReasons()
262     self.log_serial = 0
263     self.received_timestamp = TimeStampNow()
264     self.start_timestamp = None
265     self.end_timestamp = None
266     self.archived = False
267
268     self._InitInMemory(self, writable)
269
270     assert not self.archived, "New jobs can not be marked as archived"
271
272   @staticmethod
273   def _InitInMemory(obj, writable):
274     """Initializes in-memory variables.
275
276     """
277     obj.writable = writable
278     obj.ops_iter = None
279     obj.cur_opctx = None
280
281     # Read-only jobs are not processed and therefore don't need a lock
282     if writable:
283       obj.processor_lock = threading.Lock()
284     else:
285       obj.processor_lock = None
286
287   def __repr__(self):
288     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
289               "id=%s" % self.id,
290               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
291
292     return "<%s at %#x>" % (" ".join(status), id(self))
293
294   @classmethod
295   def Restore(cls, queue, state, writable, archived):
296     """Restore a _QueuedJob from serialized state:
297
298     @type queue: L{JobQueue}
299     @param queue: to which queue the restored job belongs
300     @type state: dict
301     @param state: the serialized state
302     @type writable: bool
303     @param writable: Whether job can be modified
304     @type archived: bool
305     @param archived: Whether job was already archived
306     @rtype: _JobQueue
307     @return: the restored _JobQueue instance
308
309     """
310     obj = _QueuedJob.__new__(cls)
311     obj.queue = queue
312     obj.id = int(state["id"])
313     obj.received_timestamp = state.get("received_timestamp", None)
314     obj.start_timestamp = state.get("start_timestamp", None)
315     obj.end_timestamp = state.get("end_timestamp", None)
316     obj.archived = archived
317
318     obj.ops = []
319     obj.log_serial = 0
320     for op_state in state["ops"]:
321       op = _QueuedOpCode.Restore(op_state)
322       for log_entry in op.log:
323         obj.log_serial = max(obj.log_serial, log_entry[0])
324       obj.ops.append(op)
325
326     cls._InitInMemory(obj, writable)
327
328     return obj
329
330   def Serialize(self):
331     """Serialize the _JobQueue instance.
332
333     @rtype: dict
334     @return: the serialized state
335
336     """
337     return {
338       "id": self.id,
339       "ops": [op.Serialize() for op in self.ops],
340       "start_timestamp": self.start_timestamp,
341       "end_timestamp": self.end_timestamp,
342       "received_timestamp": self.received_timestamp,
343       }
344
345   def CalcStatus(self):
346     """Compute the status of this job.
347
348     This function iterates over all the _QueuedOpCodes in the job and
349     based on their status, computes the job status.
350
351     The algorithm is:
352       - if we find a cancelled, or finished with error, the job
353         status will be the same
354       - otherwise, the last opcode with the status one of:
355           - waitlock
356           - canceling
357           - running
358
359         will determine the job status
360
361       - otherwise, it means either all opcodes are queued, or success,
362         and the job status will be the same
363
364     @return: the job status
365
366     """
367     status = constants.JOB_STATUS_QUEUED
368
369     all_success = True
370     for op in self.ops:
371       if op.status == constants.OP_STATUS_SUCCESS:
372         continue
373
374       all_success = False
375
376       if op.status == constants.OP_STATUS_QUEUED:
377         pass
378       elif op.status == constants.OP_STATUS_WAITING:
379         status = constants.JOB_STATUS_WAITING
380       elif op.status == constants.OP_STATUS_RUNNING:
381         status = constants.JOB_STATUS_RUNNING
382       elif op.status == constants.OP_STATUS_CANCELING:
383         status = constants.JOB_STATUS_CANCELING
384         break
385       elif op.status == constants.OP_STATUS_ERROR:
386         status = constants.JOB_STATUS_ERROR
387         # The whole job fails if one opcode failed
388         break
389       elif op.status == constants.OP_STATUS_CANCELED:
390         status = constants.OP_STATUS_CANCELED
391         break
392
393     if all_success:
394       status = constants.JOB_STATUS_SUCCESS
395
396     return status
397
398   def CalcPriority(self):
399     """Gets the current priority for this job.
400
401     Only unfinished opcodes are considered. When all are done, the default
402     priority is used.
403
404     @rtype: int
405
406     """
407     priorities = [op.priority for op in self.ops
408                   if op.status not in constants.OPS_FINALIZED]
409
410     if not priorities:
411       # All opcodes are done, assume default priority
412       return constants.OP_PRIO_DEFAULT
413
414     return min(priorities)
415
416   def GetLogEntries(self, newer_than):
417     """Selectively returns the log entries.
418
419     @type newer_than: None or int
420     @param newer_than: if this is None, return all log entries,
421         otherwise return only the log entries with serial higher
422         than this value
423     @rtype: list
424     @return: the list of the log entries selected
425
426     """
427     if newer_than is None:
428       serial = -1
429     else:
430       serial = newer_than
431
432     entries = []
433     for op in self.ops:
434       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
435
436     return entries
437
438   def GetInfo(self, fields):
439     """Returns information about a job.
440
441     @type fields: list
442     @param fields: names of fields to return
443     @rtype: list
444     @return: list with one element for each field
445     @raise errors.OpExecError: when an invalid field
446         has been passed
447
448     """
449     return _SimpleJobQuery(fields)(self)
450
451   def MarkUnfinishedOps(self, status, result):
452     """Mark unfinished opcodes with a given status and result.
453
454     This is an utility function for marking all running or waiting to
455     be run opcodes with a given status. Opcodes which are already
456     finalised are not changed.
457
458     @param status: a given opcode status
459     @param result: the opcode result
460
461     """
462     not_marked = True
463     for op in self.ops:
464       if op.status in constants.OPS_FINALIZED:
465         assert not_marked, "Finalized opcodes found after non-finalized ones"
466         continue
467       op.status = status
468       op.result = result
469       not_marked = False
470
471   def Finalize(self):
472     """Marks the job as finalized.
473
474     """
475     self.end_timestamp = TimeStampNow()
476
477   def Cancel(self):
478     """Marks job as canceled/-ing if possible.
479
480     @rtype: tuple; (bool, string)
481     @return: Boolean describing whether job was successfully canceled or marked
482       as canceling and a text message
483
484     """
485     status = self.CalcStatus()
486
487     if status == constants.JOB_STATUS_QUEUED:
488       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
489                              "Job canceled by request")
490       self.Finalize()
491       return (True, "Job %s canceled" % self.id)
492
493     elif status == constants.JOB_STATUS_WAITING:
494       # The worker will notice the new status and cancel the job
495       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
496       return (True, "Job %s will be canceled" % self.id)
497
498     else:
499       logging.debug("Job %s is no longer waiting in the queue", self.id)
500       return (False, "Job %s is no longer waiting in the queue" % self.id)
501
502   def ChangePriority(self, priority):
503     """Changes the job priority.
504
505     @type priority: int
506     @param priority: New priority
507     @rtype: tuple; (bool, string)
508     @return: Boolean describing whether job's priority was successfully changed
509       and a text message
510
511     """
512     status = self.CalcStatus()
513
514     if status in constants.JOBS_FINALIZED:
515       return (False, "Job %s is finished" % self.id)
516     elif status == constants.JOB_STATUS_CANCELING:
517       return (False, "Job %s is cancelling" % self.id)
518     else:
519       assert status in (constants.JOB_STATUS_QUEUED,
520                         constants.JOB_STATUS_WAITING,
521                         constants.JOB_STATUS_RUNNING)
522
523       changed = False
524       for op in self.ops:
525         if (op.status == constants.OP_STATUS_RUNNING or
526             op.status in constants.OPS_FINALIZED):
527           assert not changed, \
528             ("Found opcode for which priority should not be changed after"
529              " priority has been changed for previous opcodes")
530           continue
531
532         assert op.status in (constants.OP_STATUS_QUEUED,
533                              constants.OP_STATUS_WAITING)
534
535         changed = True
536
537         # Set new priority (doesn't modify opcode input)
538         op.priority = priority
539
540       if changed:
541         return (True, ("Priorities of pending opcodes for job %s have been"
542                        " changed to %s" % (self.id, priority)))
543       else:
544         return (False, "Job %s had no pending opcodes" % self.id)
545
546
547 class _OpExecCallbacks(mcpu.OpExecCbBase):
548   def __init__(self, queue, job, op):
549     """Initializes this class.
550
551     @type queue: L{JobQueue}
552     @param queue: Job queue
553     @type job: L{_QueuedJob}
554     @param job: Job object
555     @type op: L{_QueuedOpCode}
556     @param op: OpCode
557
558     """
559     assert queue, "Queue is missing"
560     assert job, "Job is missing"
561     assert op, "Opcode is missing"
562
563     self._queue = queue
564     self._job = job
565     self._op = op
566
567   def _CheckCancel(self):
568     """Raises an exception to cancel the job if asked to.
569
570     """
571     # Cancel here if we were asked to
572     if self._op.status == constants.OP_STATUS_CANCELING:
573       logging.debug("Canceling opcode")
574       raise CancelJob()
575
576     # See if queue is shutting down
577     if not self._queue.AcceptingJobsUnlocked():
578       logging.debug("Queue is shutting down")
579       raise QueueShutdown()
580
581   @locking.ssynchronized(_QUEUE, shared=1)
582   def NotifyStart(self):
583     """Mark the opcode as running, not lock-waiting.
584
585     This is called from the mcpu code as a notifier function, when the LU is
586     finally about to start the Exec() method. Of course, to have end-user
587     visible results, the opcode must be initially (before calling into
588     Processor.ExecOpCode) set to OP_STATUS_WAITING.
589
590     """
591     assert self._op in self._job.ops
592     assert self._op.status in (constants.OP_STATUS_WAITING,
593                                constants.OP_STATUS_CANCELING)
594
595     # Cancel here if we were asked to
596     self._CheckCancel()
597
598     logging.debug("Opcode is now running")
599
600     self._op.status = constants.OP_STATUS_RUNNING
601     self._op.exec_timestamp = TimeStampNow()
602
603     # And finally replicate the job status
604     self._queue.UpdateJobUnlocked(self._job)
605
606   @locking.ssynchronized(_QUEUE, shared=1)
607   def _AppendFeedback(self, timestamp, log_type, log_msg):
608     """Internal feedback append function, with locks
609
610     """
611     self._job.log_serial += 1
612     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
613     self._queue.UpdateJobUnlocked(self._job, replicate=False)
614
615   def Feedback(self, *args):
616     """Append a log entry.
617
618     """
619     assert len(args) < 3
620
621     if len(args) == 1:
622       log_type = constants.ELOG_MESSAGE
623       log_msg = args[0]
624     else:
625       (log_type, log_msg) = args
626
627     # The time is split to make serialization easier and not lose
628     # precision.
629     timestamp = utils.SplitTime(time.time())
630     self._AppendFeedback(timestamp, log_type, log_msg)
631
632   def CurrentPriority(self):
633     """Returns current priority for opcode.
634
635     """
636     assert self._op.status in (constants.OP_STATUS_WAITING,
637                                constants.OP_STATUS_CANCELING)
638
639     # Cancel here if we were asked to
640     self._CheckCancel()
641
642     return self._op.priority
643
644   def SubmitManyJobs(self, jobs):
645     """Submits jobs for processing.
646
647     See L{JobQueue.SubmitManyJobs}.
648
649     """
650     # Locking is done in job queue
651     return self._queue.SubmitManyJobs(jobs)
652
653
654 class _JobChangesChecker(object):
655   def __init__(self, fields, prev_job_info, prev_log_serial):
656     """Initializes this class.
657
658     @type fields: list of strings
659     @param fields: Fields requested by LUXI client
660     @type prev_job_info: string
661     @param prev_job_info: previous job info, as passed by the LUXI client
662     @type prev_log_serial: string
663     @param prev_log_serial: previous job serial, as passed by the LUXI client
664
665     """
666     self._squery = _SimpleJobQuery(fields)
667     self._prev_job_info = prev_job_info
668     self._prev_log_serial = prev_log_serial
669
670   def __call__(self, job):
671     """Checks whether job has changed.
672
673     @type job: L{_QueuedJob}
674     @param job: Job object
675
676     """
677     assert not job.writable, "Expected read-only job"
678
679     status = job.CalcStatus()
680     job_info = self._squery(job)
681     log_entries = job.GetLogEntries(self._prev_log_serial)
682
683     # Serializing and deserializing data can cause type changes (e.g. from
684     # tuple to list) or precision loss. We're doing it here so that we get
685     # the same modifications as the data received from the client. Without
686     # this, the comparison afterwards might fail without the data being
687     # significantly different.
688     # TODO: we just deserialized from disk, investigate how to make sure that
689     # the job info and log entries are compatible to avoid this further step.
690     # TODO: Doing something like in testutils.py:UnifyValueType might be more
691     # efficient, though floats will be tricky
692     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
693     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
694
695     # Don't even try to wait if the job is no longer running, there will be
696     # no changes.
697     if (status not in (constants.JOB_STATUS_QUEUED,
698                        constants.JOB_STATUS_RUNNING,
699                        constants.JOB_STATUS_WAITING) or
700         job_info != self._prev_job_info or
701         (log_entries and self._prev_log_serial != log_entries[0][0])):
702       logging.debug("Job %s changed", job.id)
703       return (job_info, log_entries)
704
705     return None
706
707
708 class _JobFileChangesWaiter(object):
709   def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
710     """Initializes this class.
711
712     @type filename: string
713     @param filename: Path to job file
714     @raises errors.InotifyError: if the notifier cannot be setup
715
716     """
717     self._wm = _inotify_wm_cls()
718     self._inotify_handler = \
719       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
720     self._notifier = \
721       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
722     try:
723       self._inotify_handler.enable()
724     except Exception:
725       # pyinotify doesn't close file descriptors automatically
726       self._notifier.stop()
727       raise
728
729   def _OnInotify(self, notifier_enabled):
730     """Callback for inotify.
731
732     """
733     if not notifier_enabled:
734       self._inotify_handler.enable()
735
736   def Wait(self, timeout):
737     """Waits for the job file to change.
738
739     @type timeout: float
740     @param timeout: Timeout in seconds
741     @return: Whether there have been events
742
743     """
744     assert timeout >= 0
745     have_events = self._notifier.check_events(timeout * 1000)
746     if have_events:
747       self._notifier.read_events()
748     self._notifier.process_events()
749     return have_events
750
751   def Close(self):
752     """Closes underlying notifier and its file descriptor.
753
754     """
755     self._notifier.stop()
756
757
758 class _JobChangesWaiter(object):
759   def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
760     """Initializes this class.
761
762     @type filename: string
763     @param filename: Path to job file
764
765     """
766     self._filewaiter = None
767     self._filename = filename
768     self._waiter_cls = _waiter_cls
769
770   def Wait(self, timeout):
771     """Waits for a job to change.
772
773     @type timeout: float
774     @param timeout: Timeout in seconds
775     @return: Whether there have been events
776
777     """
778     if self._filewaiter:
779       return self._filewaiter.Wait(timeout)
780
781     # Lazy setup: Avoid inotify setup cost when job file has already changed.
782     # If this point is reached, return immediately and let caller check the job
783     # file again in case there were changes since the last check. This avoids a
784     # race condition.
785     self._filewaiter = self._waiter_cls(self._filename)
786
787     return True
788
789   def Close(self):
790     """Closes underlying waiter.
791
792     """
793     if self._filewaiter:
794       self._filewaiter.Close()
795
796
797 class _WaitForJobChangesHelper(object):
798   """Helper class using inotify to wait for changes in a job file.
799
800   This class takes a previous job status and serial, and alerts the client when
801   the current job status has changed.
802
803   """
804   @staticmethod
805   def _CheckForChanges(counter, job_load_fn, check_fn):
806     if counter.next() > 0:
807       # If this isn't the first check the job is given some more time to change
808       # again. This gives better performance for jobs generating many
809       # changes/messages.
810       time.sleep(0.1)
811
812     job = job_load_fn()
813     if not job:
814       raise errors.JobLost()
815
816     result = check_fn(job)
817     if result is None:
818       raise utils.RetryAgain()
819
820     return result
821
822   def __call__(self, filename, job_load_fn,
823                fields, prev_job_info, prev_log_serial, timeout,
824                _waiter_cls=_JobChangesWaiter):
825     """Waits for changes on a job.
826
827     @type filename: string
828     @param filename: File on which to wait for changes
829     @type job_load_fn: callable
830     @param job_load_fn: Function to load job
831     @type fields: list of strings
832     @param fields: Which fields to check for changes
833     @type prev_job_info: list or None
834     @param prev_job_info: Last job information returned
835     @type prev_log_serial: int
836     @param prev_log_serial: Last job message serial number
837     @type timeout: float
838     @param timeout: maximum time to wait in seconds
839
840     """
841     counter = itertools.count()
842     try:
843       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
844       waiter = _waiter_cls(filename)
845       try:
846         return utils.Retry(compat.partial(self._CheckForChanges,
847                                           counter, job_load_fn, check_fn),
848                            utils.RETRY_REMAINING_TIME, timeout,
849                            wait_fn=waiter.Wait)
850       finally:
851         waiter.Close()
852     except errors.JobLost:
853       return None
854     except utils.RetryTimeout:
855       return constants.JOB_NOTCHANGED
856
857
858 def _EncodeOpError(err):
859   """Encodes an error which occurred while processing an opcode.
860
861   """
862   if isinstance(err, errors.GenericError):
863     to_encode = err
864   else:
865     to_encode = errors.OpExecError(str(err))
866
867   return errors.EncodeException(to_encode)
868
869
870 class _TimeoutStrategyWrapper:
871   def __init__(self, fn):
872     """Initializes this class.
873
874     """
875     self._fn = fn
876     self._next = None
877
878   def _Advance(self):
879     """Gets the next timeout if necessary.
880
881     """
882     if self._next is None:
883       self._next = self._fn()
884
885   def Peek(self):
886     """Returns the next timeout.
887
888     """
889     self._Advance()
890     return self._next
891
892   def Next(self):
893     """Returns the current timeout and advances the internal state.
894
895     """
896     self._Advance()
897     result = self._next
898     self._next = None
899     return result
900
901
902 class _OpExecContext:
903   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
904     """Initializes this class.
905
906     """
907     self.op = op
908     self.index = index
909     self.log_prefix = log_prefix
910     self.summary = op.input.Summary()
911
912     # Create local copy to modify
913     if getattr(op.input, opcodes.DEPEND_ATTR, None):
914       self.jobdeps = op.input.depends[:]
915     else:
916       self.jobdeps = None
917
918     self._timeout_strategy_factory = timeout_strategy_factory
919     self._ResetTimeoutStrategy()
920
921   def _ResetTimeoutStrategy(self):
922     """Creates a new timeout strategy.
923
924     """
925     self._timeout_strategy = \
926       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
927
928   def CheckPriorityIncrease(self):
929     """Checks whether priority can and should be increased.
930
931     Called when locks couldn't be acquired.
932
933     """
934     op = self.op
935
936     # Exhausted all retries and next round should not use blocking acquire
937     # for locks?
938     if (self._timeout_strategy.Peek() is None and
939         op.priority > constants.OP_PRIO_HIGHEST):
940       logging.debug("Increasing priority")
941       op.priority -= 1
942       self._ResetTimeoutStrategy()
943       return True
944
945     return False
946
947   def GetNextLockTimeout(self):
948     """Returns the next lock acquire timeout.
949
950     """
951     return self._timeout_strategy.Next()
952
953
954 class _JobProcessor(object):
955   (DEFER,
956    WAITDEP,
957    FINISHED) = range(1, 4)
958
959   def __init__(self, queue, opexec_fn, job,
960                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
961     """Initializes this class.
962
963     """
964     self.queue = queue
965     self.opexec_fn = opexec_fn
966     self.job = job
967     self._timeout_strategy_factory = _timeout_strategy_factory
968
969   @staticmethod
970   def _FindNextOpcode(job, timeout_strategy_factory):
971     """Locates the next opcode to run.
972
973     @type job: L{_QueuedJob}
974     @param job: Job object
975     @param timeout_strategy_factory: Callable to create new timeout strategy
976
977     """
978     # Create some sort of a cache to speed up locating next opcode for future
979     # lookups
980     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
981     # pending and one for processed ops.
982     if job.ops_iter is None:
983       job.ops_iter = enumerate(job.ops)
984
985     # Find next opcode to run
986     while True:
987       try:
988         (idx, op) = job.ops_iter.next()
989       except StopIteration:
990         raise errors.ProgrammerError("Called for a finished job")
991
992       if op.status == constants.OP_STATUS_RUNNING:
993         # Found an opcode already marked as running
994         raise errors.ProgrammerError("Called for job marked as running")
995
996       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
997                              timeout_strategy_factory)
998
999       if op.status not in constants.OPS_FINALIZED:
1000         return opctx
1001
1002       # This is a job that was partially completed before master daemon
1003       # shutdown, so it can be expected that some opcodes are already
1004       # completed successfully (if any did error out, then the whole job
1005       # should have been aborted and not resubmitted for processing).
1006       logging.info("%s: opcode %s already processed, skipping",
1007                    opctx.log_prefix, opctx.summary)
1008
1009   @staticmethod
1010   def _MarkWaitlock(job, op):
1011     """Marks an opcode as waiting for locks.
1012
1013     The job's start timestamp is also set if necessary.
1014
1015     @type job: L{_QueuedJob}
1016     @param job: Job object
1017     @type op: L{_QueuedOpCode}
1018     @param op: Opcode object
1019
1020     """
1021     assert op in job.ops
1022     assert op.status in (constants.OP_STATUS_QUEUED,
1023                          constants.OP_STATUS_WAITING)
1024
1025     update = False
1026
1027     op.result = None
1028
1029     if op.status == constants.OP_STATUS_QUEUED:
1030       op.status = constants.OP_STATUS_WAITING
1031       update = True
1032
1033     if op.start_timestamp is None:
1034       op.start_timestamp = TimeStampNow()
1035       update = True
1036
1037     if job.start_timestamp is None:
1038       job.start_timestamp = op.start_timestamp
1039       update = True
1040
1041     assert op.status == constants.OP_STATUS_WAITING
1042
1043     return update
1044
1045   @staticmethod
1046   def _CheckDependencies(queue, job, opctx):
1047     """Checks if an opcode has dependencies and if so, processes them.
1048
1049     @type queue: L{JobQueue}
1050     @param queue: Queue object
1051     @type job: L{_QueuedJob}
1052     @param job: Job object
1053     @type opctx: L{_OpExecContext}
1054     @param opctx: Opcode execution context
1055     @rtype: bool
1056     @return: Whether opcode will be re-scheduled by dependency tracker
1057
1058     """
1059     op = opctx.op
1060
1061     result = False
1062
1063     while opctx.jobdeps:
1064       (dep_job_id, dep_status) = opctx.jobdeps[0]
1065
1066       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1067                                                           dep_status)
1068       assert ht.TNonEmptyString(depmsg), "No dependency message"
1069
1070       logging.info("%s: %s", opctx.log_prefix, depmsg)
1071
1072       if depresult == _JobDependencyManager.CONTINUE:
1073         # Remove dependency and continue
1074         opctx.jobdeps.pop(0)
1075
1076       elif depresult == _JobDependencyManager.WAIT:
1077         # Need to wait for notification, dependency tracker will re-add job
1078         # to workerpool
1079         result = True
1080         break
1081
1082       elif depresult == _JobDependencyManager.CANCEL:
1083         # Job was cancelled, cancel this job as well
1084         job.Cancel()
1085         assert op.status == constants.OP_STATUS_CANCELING
1086         break
1087
1088       elif depresult in (_JobDependencyManager.WRONGSTATUS,
1089                          _JobDependencyManager.ERROR):
1090         # Job failed or there was an error, this job must fail
1091         op.status = constants.OP_STATUS_ERROR
1092         op.result = _EncodeOpError(errors.OpExecError(depmsg))
1093         break
1094
1095       else:
1096         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1097                                      depresult)
1098
1099     return result
1100
1101   def _ExecOpCodeUnlocked(self, opctx):
1102     """Processes one opcode and returns the result.
1103
1104     """
1105     op = opctx.op
1106
1107     assert op.status == constants.OP_STATUS_WAITING
1108
1109     timeout = opctx.GetNextLockTimeout()
1110
1111     try:
1112       # Make sure not to hold queue lock while calling ExecOpCode
1113       result = self.opexec_fn(op.input,
1114                               _OpExecCallbacks(self.queue, self.job, op),
1115                               timeout=timeout)
1116     except mcpu.LockAcquireTimeout:
1117       assert timeout is not None, "Received timeout for blocking acquire"
1118       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1119
1120       assert op.status in (constants.OP_STATUS_WAITING,
1121                            constants.OP_STATUS_CANCELING)
1122
1123       # Was job cancelled while we were waiting for the lock?
1124       if op.status == constants.OP_STATUS_CANCELING:
1125         return (constants.OP_STATUS_CANCELING, None)
1126
1127       # Queue is shutting down, return to queued
1128       if not self.queue.AcceptingJobsUnlocked():
1129         return (constants.OP_STATUS_QUEUED, None)
1130
1131       # Stay in waitlock while trying to re-acquire lock
1132       return (constants.OP_STATUS_WAITING, None)
1133     except CancelJob:
1134       logging.exception("%s: Canceling job", opctx.log_prefix)
1135       assert op.status == constants.OP_STATUS_CANCELING
1136       return (constants.OP_STATUS_CANCELING, None)
1137
1138     except QueueShutdown:
1139       logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1140
1141       assert op.status == constants.OP_STATUS_WAITING
1142
1143       # Job hadn't been started yet, so it should return to the queue
1144       return (constants.OP_STATUS_QUEUED, None)
1145
1146     except Exception, err: # pylint: disable=W0703
1147       logging.exception("%s: Caught exception in %s",
1148                         opctx.log_prefix, opctx.summary)
1149       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1150     else:
1151       logging.debug("%s: %s successful",
1152                     opctx.log_prefix, opctx.summary)
1153       return (constants.OP_STATUS_SUCCESS, result)
1154
1155   def __call__(self, _nextop_fn=None):
1156     """Continues execution of a job.
1157
1158     @param _nextop_fn: Callback function for tests
1159     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1160       be deferred and C{WAITDEP} if the dependency manager
1161       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1162
1163     """
1164     queue = self.queue
1165     job = self.job
1166
1167     logging.debug("Processing job %s", job.id)
1168
1169     queue.acquire(shared=1)
1170     try:
1171       opcount = len(job.ops)
1172
1173       assert job.writable, "Expected writable job"
1174
1175       # Don't do anything for finalized jobs
1176       if job.CalcStatus() in constants.JOBS_FINALIZED:
1177         return self.FINISHED
1178
1179       # Is a previous opcode still pending?
1180       if job.cur_opctx:
1181         opctx = job.cur_opctx
1182         job.cur_opctx = None
1183       else:
1184         if __debug__ and _nextop_fn:
1185           _nextop_fn()
1186         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1187
1188       op = opctx.op
1189
1190       # Consistency check
1191       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1192                                      constants.OP_STATUS_CANCELING)
1193                         for i in job.ops[opctx.index + 1:])
1194
1195       assert op.status in (constants.OP_STATUS_QUEUED,
1196                            constants.OP_STATUS_WAITING,
1197                            constants.OP_STATUS_CANCELING)
1198
1199       assert (op.priority <= constants.OP_PRIO_LOWEST and
1200               op.priority >= constants.OP_PRIO_HIGHEST)
1201
1202       waitjob = None
1203
1204       if op.status != constants.OP_STATUS_CANCELING:
1205         assert op.status in (constants.OP_STATUS_QUEUED,
1206                              constants.OP_STATUS_WAITING)
1207
1208         # Prepare to start opcode
1209         if self._MarkWaitlock(job, op):
1210           # Write to disk
1211           queue.UpdateJobUnlocked(job)
1212
1213         assert op.status == constants.OP_STATUS_WAITING
1214         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1215         assert job.start_timestamp and op.start_timestamp
1216         assert waitjob is None
1217
1218         # Check if waiting for a job is necessary
1219         waitjob = self._CheckDependencies(queue, job, opctx)
1220
1221         assert op.status in (constants.OP_STATUS_WAITING,
1222                              constants.OP_STATUS_CANCELING,
1223                              constants.OP_STATUS_ERROR)
1224
1225         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1226                                          constants.OP_STATUS_ERROR)):
1227           logging.info("%s: opcode %s waiting for locks",
1228                        opctx.log_prefix, opctx.summary)
1229
1230           assert not opctx.jobdeps, "Not all dependencies were removed"
1231
1232           queue.release()
1233           try:
1234             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1235           finally:
1236             queue.acquire(shared=1)
1237
1238           op.status = op_status
1239           op.result = op_result
1240
1241           assert not waitjob
1242
1243         if op.status in (constants.OP_STATUS_WAITING,
1244                          constants.OP_STATUS_QUEUED):
1245           # waiting: Couldn't get locks in time
1246           # queued: Queue is shutting down
1247           assert not op.end_timestamp
1248         else:
1249           # Finalize opcode
1250           op.end_timestamp = TimeStampNow()
1251
1252           if op.status == constants.OP_STATUS_CANCELING:
1253             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1254                                   for i in job.ops[opctx.index:])
1255           else:
1256             assert op.status in constants.OPS_FINALIZED
1257
1258       if op.status == constants.OP_STATUS_QUEUED:
1259         # Queue is shutting down
1260         assert not waitjob
1261
1262         finalize = False
1263
1264         # Reset context
1265         job.cur_opctx = None
1266
1267         # In no case must the status be finalized here
1268         assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1269
1270       elif op.status == constants.OP_STATUS_WAITING or waitjob:
1271         finalize = False
1272
1273         if not waitjob and opctx.CheckPriorityIncrease():
1274           # Priority was changed, need to update on-disk file
1275           queue.UpdateJobUnlocked(job)
1276
1277         # Keep around for another round
1278         job.cur_opctx = opctx
1279
1280         assert (op.priority <= constants.OP_PRIO_LOWEST and
1281                 op.priority >= constants.OP_PRIO_HIGHEST)
1282
1283         # In no case must the status be finalized here
1284         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1285
1286       else:
1287         # Ensure all opcodes so far have been successful
1288         assert (opctx.index == 0 or
1289                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1290                            for i in job.ops[:opctx.index]))
1291
1292         # Reset context
1293         job.cur_opctx = None
1294
1295         if op.status == constants.OP_STATUS_SUCCESS:
1296           finalize = False
1297
1298         elif op.status == constants.OP_STATUS_ERROR:
1299           # Ensure failed opcode has an exception as its result
1300           assert errors.GetEncodedError(job.ops[opctx.index].result)
1301
1302           to_encode = errors.OpExecError("Preceding opcode failed")
1303           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1304                                 _EncodeOpError(to_encode))
1305           finalize = True
1306
1307           # Consistency check
1308           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1309                             errors.GetEncodedError(i.result)
1310                             for i in job.ops[opctx.index:])
1311
1312         elif op.status == constants.OP_STATUS_CANCELING:
1313           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1314                                 "Job canceled by request")
1315           finalize = True
1316
1317         else:
1318           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1319
1320         if opctx.index == (opcount - 1):
1321           # Finalize on last opcode
1322           finalize = True
1323
1324         if finalize:
1325           # All opcodes have been run, finalize job
1326           job.Finalize()
1327
1328         # Write to disk. If the job status is final, this is the final write
1329         # allowed. Once the file has been written, it can be archived anytime.
1330         queue.UpdateJobUnlocked(job)
1331
1332         assert not waitjob
1333
1334         if finalize:
1335           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1336           return self.FINISHED
1337
1338       assert not waitjob or queue.depmgr.JobWaiting(job)
1339
1340       if waitjob:
1341         return self.WAITDEP
1342       else:
1343         return self.DEFER
1344     finally:
1345       assert job.writable, "Job became read-only while being processed"
1346       queue.release()
1347
1348
1349 def _EvaluateJobProcessorResult(depmgr, job, result):
1350   """Looks at a result from L{_JobProcessor} for a job.
1351
1352   To be used in a L{_JobQueueWorker}.
1353
1354   """
1355   if result == _JobProcessor.FINISHED:
1356     # Notify waiting jobs
1357     depmgr.NotifyWaiters(job.id)
1358
1359   elif result == _JobProcessor.DEFER:
1360     # Schedule again
1361     raise workerpool.DeferTask(priority=job.CalcPriority())
1362
1363   elif result == _JobProcessor.WAITDEP:
1364     # No-op, dependency manager will re-schedule
1365     pass
1366
1367   else:
1368     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1369                                  (result, ))
1370
1371
1372 class _JobQueueWorker(workerpool.BaseWorker):
1373   """The actual job workers.
1374
1375   """
1376   def RunTask(self, job): # pylint: disable=W0221
1377     """Job executor.
1378
1379     @type job: L{_QueuedJob}
1380     @param job: the job to be processed
1381
1382     """
1383     assert job.writable, "Expected writable job"
1384
1385     # Ensure only one worker is active on a single job. If a job registers for
1386     # a dependency job, and the other job notifies before the first worker is
1387     # done, the job can end up in the tasklist more than once.
1388     job.processor_lock.acquire()
1389     try:
1390       return self._RunTaskInner(job)
1391     finally:
1392       job.processor_lock.release()
1393
1394   def _RunTaskInner(self, job):
1395     """Executes a job.
1396
1397     Must be called with per-job lock acquired.
1398
1399     """
1400     queue = job.queue
1401     assert queue == self.pool.queue
1402
1403     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1404     setname_fn(None)
1405
1406     proc = mcpu.Processor(queue.context, job.id)
1407
1408     # Create wrapper for setting thread name
1409     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1410                                     proc.ExecOpCode)
1411
1412     _EvaluateJobProcessorResult(queue.depmgr, job,
1413                                 _JobProcessor(queue, wrap_execop_fn, job)())
1414
1415   @staticmethod
1416   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1417     """Updates the worker thread name to include a short summary of the opcode.
1418
1419     @param setname_fn: Callable setting worker thread name
1420     @param execop_fn: Callable for executing opcode (usually
1421                       L{mcpu.Processor.ExecOpCode})
1422
1423     """
1424     setname_fn(op)
1425     try:
1426       return execop_fn(op, *args, **kwargs)
1427     finally:
1428       setname_fn(None)
1429
1430   @staticmethod
1431   def _GetWorkerName(job, op):
1432     """Sets the worker thread name.
1433
1434     @type job: L{_QueuedJob}
1435     @type op: L{opcodes.OpCode}
1436
1437     """
1438     parts = ["Job%s" % job.id]
1439
1440     if op:
1441       parts.append(op.TinySummary())
1442
1443     return "/".join(parts)
1444
1445
1446 class _JobQueueWorkerPool(workerpool.WorkerPool):
1447   """Simple class implementing a job-processing workerpool.
1448
1449   """
1450   def __init__(self, queue):
1451     super(_JobQueueWorkerPool, self).__init__("Jq",
1452                                               JOBQUEUE_THREADS,
1453                                               _JobQueueWorker)
1454     self.queue = queue
1455
1456
1457 class _JobDependencyManager:
1458   """Keeps track of job dependencies.
1459
1460   """
1461   (WAIT,
1462    ERROR,
1463    CANCEL,
1464    CONTINUE,
1465    WRONGSTATUS) = range(1, 6)
1466
1467   def __init__(self, getstatus_fn, enqueue_fn):
1468     """Initializes this class.
1469
1470     """
1471     self._getstatus_fn = getstatus_fn
1472     self._enqueue_fn = enqueue_fn
1473
1474     self._waiters = {}
1475     self._lock = locking.SharedLock("JobDepMgr")
1476
1477   @locking.ssynchronized(_LOCK, shared=1)
1478   def GetLockInfo(self, requested): # pylint: disable=W0613
1479     """Retrieves information about waiting jobs.
1480
1481     @type requested: set
1482     @param requested: Requested information, see C{query.LQ_*}
1483
1484     """
1485     # No need to sort here, that's being done by the lock manager and query
1486     # library. There are no priorities for notifying jobs, hence all show up as
1487     # one item under "pending".
1488     return [("job/%s" % job_id, None, None,
1489              [("job", [job.id for job in waiters])])
1490             for job_id, waiters in self._waiters.items()
1491             if waiters]
1492
1493   @locking.ssynchronized(_LOCK, shared=1)
1494   def JobWaiting(self, job):
1495     """Checks if a job is waiting.
1496
1497     """
1498     return compat.any(job in jobs
1499                       for jobs in self._waiters.values())
1500
1501   @locking.ssynchronized(_LOCK)
1502   def CheckAndRegister(self, job, dep_job_id, dep_status):
1503     """Checks if a dependency job has the requested status.
1504
1505     If the other job is not yet in a finalized status, the calling job will be
1506     notified (re-added to the workerpool) at a later point.
1507
1508     @type job: L{_QueuedJob}
1509     @param job: Job object
1510     @type dep_job_id: int
1511     @param dep_job_id: ID of dependency job
1512     @type dep_status: list
1513     @param dep_status: Required status
1514
1515     """
1516     assert ht.TJobId(job.id)
1517     assert ht.TJobId(dep_job_id)
1518     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1519
1520     if job.id == dep_job_id:
1521       return (self.ERROR, "Job can't depend on itself")
1522
1523     # Get status of dependency job
1524     try:
1525       status = self._getstatus_fn(dep_job_id)
1526     except errors.JobLost, err:
1527       return (self.ERROR, "Dependency error: %s" % err)
1528
1529     assert status in constants.JOB_STATUS_ALL
1530
1531     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1532
1533     if status not in constants.JOBS_FINALIZED:
1534       # Register for notification and wait for job to finish
1535       job_id_waiters.add(job)
1536       return (self.WAIT,
1537               "Need to wait for job %s, wanted status '%s'" %
1538               (dep_job_id, dep_status))
1539
1540     # Remove from waiters list
1541     if job in job_id_waiters:
1542       job_id_waiters.remove(job)
1543
1544     if (status == constants.JOB_STATUS_CANCELED and
1545         constants.JOB_STATUS_CANCELED not in dep_status):
1546       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1547
1548     elif not dep_status or status in dep_status:
1549       return (self.CONTINUE,
1550               "Dependency job %s finished with status '%s'" %
1551               (dep_job_id, status))
1552
1553     else:
1554       return (self.WRONGSTATUS,
1555               "Dependency job %s finished with status '%s',"
1556               " not one of '%s' as required" %
1557               (dep_job_id, status, utils.CommaJoin(dep_status)))
1558
1559   def _RemoveEmptyWaitersUnlocked(self):
1560     """Remove all jobs without actual waiters.
1561
1562     """
1563     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1564                    if not waiters]:
1565       del self._waiters[job_id]
1566
1567   def NotifyWaiters(self, job_id):
1568     """Notifies all jobs waiting for a certain job ID.
1569
1570     @attention: Do not call until L{CheckAndRegister} returned a status other
1571       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1572     @type job_id: int
1573     @param job_id: Job ID
1574
1575     """
1576     assert ht.TJobId(job_id)
1577
1578     self._lock.acquire()
1579     try:
1580       self._RemoveEmptyWaitersUnlocked()
1581
1582       jobs = self._waiters.pop(job_id, None)
1583     finally:
1584       self._lock.release()
1585
1586     if jobs:
1587       # Re-add jobs to workerpool
1588       logging.debug("Re-adding %s jobs which were waiting for job %s",
1589                     len(jobs), job_id)
1590       self._enqueue_fn(jobs)
1591
1592
1593 def _RequireOpenQueue(fn):
1594   """Decorator for "public" functions.
1595
1596   This function should be used for all 'public' functions. That is,
1597   functions usually called from other classes. Note that this should
1598   be applied only to methods (not plain functions), since it expects
1599   that the decorated function is called with a first argument that has
1600   a '_queue_filelock' argument.
1601
1602   @warning: Use this decorator only after locking.ssynchronized
1603
1604   Example::
1605     @locking.ssynchronized(_LOCK)
1606     @_RequireOpenQueue
1607     def Example(self):
1608       pass
1609
1610   """
1611   def wrapper(self, *args, **kwargs):
1612     # pylint: disable=W0212
1613     assert self._queue_filelock is not None, "Queue should be open"
1614     return fn(self, *args, **kwargs)
1615   return wrapper
1616
1617
1618 def _RequireNonDrainedQueue(fn):
1619   """Decorator checking for a non-drained queue.
1620
1621   To be used with functions submitting new jobs.
1622
1623   """
1624   def wrapper(self, *args, **kwargs):
1625     """Wrapper function.
1626
1627     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1628
1629     """
1630     # Ok when sharing the big job queue lock, as the drain file is created when
1631     # the lock is exclusive.
1632     # Needs access to protected member, pylint: disable=W0212
1633     if self._drained:
1634       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1635
1636     if not self._accepting_jobs:
1637       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1638
1639     return fn(self, *args, **kwargs)
1640   return wrapper
1641
1642
1643 class JobQueue(object):
1644   """Queue used to manage the jobs.
1645
1646   """
1647   def __init__(self, context):
1648     """Constructor for JobQueue.
1649
1650     The constructor will initialize the job queue object and then
1651     start loading the current jobs from disk, either for starting them
1652     (if they were queue) or for aborting them (if they were already
1653     running).
1654
1655     @type context: GanetiContext
1656     @param context: the context object for access to the configuration
1657         data and other ganeti objects
1658
1659     """
1660     self.context = context
1661     self._memcache = weakref.WeakValueDictionary()
1662     self._my_hostname = netutils.Hostname.GetSysName()
1663
1664     # The Big JobQueue lock. If a code block or method acquires it in shared
1665     # mode safe it must guarantee concurrency with all the code acquiring it in
1666     # shared mode, including itself. In order not to acquire it at all
1667     # concurrency must be guaranteed with all code acquiring it in shared mode
1668     # and all code acquiring it exclusively.
1669     self._lock = locking.SharedLock("JobQueue")
1670
1671     self.acquire = self._lock.acquire
1672     self.release = self._lock.release
1673
1674     # Accept jobs by default
1675     self._accepting_jobs = True
1676
1677     # Initialize the queue, and acquire the filelock.
1678     # This ensures no other process is working on the job queue.
1679     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1680
1681     # Read serial file
1682     self._last_serial = jstore.ReadSerial()
1683     assert self._last_serial is not None, ("Serial file was modified between"
1684                                            " check in jstore and here")
1685
1686     # Get initial list of nodes
1687     self._nodes = dict((n.name, n.primary_ip)
1688                        for n in self.context.cfg.GetAllNodesInfo().values()
1689                        if n.master_candidate)
1690
1691     # Remove master node
1692     self._nodes.pop(self._my_hostname, None)
1693
1694     # TODO: Check consistency across nodes
1695
1696     self._queue_size = None
1697     self._UpdateQueueSizeUnlocked()
1698     assert ht.TInt(self._queue_size)
1699     self._drained = jstore.CheckDrainFlag()
1700
1701     # Job dependencies
1702     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1703                                         self._EnqueueJobs)
1704     self.context.glm.AddToLockMonitor(self.depmgr)
1705
1706     # Setup worker pool
1707     self._wpool = _JobQueueWorkerPool(self)
1708     try:
1709       self._InspectQueue()
1710     except:
1711       self._wpool.TerminateWorkers()
1712       raise
1713
1714   @locking.ssynchronized(_LOCK)
1715   @_RequireOpenQueue
1716   def _InspectQueue(self):
1717     """Loads the whole job queue and resumes unfinished jobs.
1718
1719     This function needs the lock here because WorkerPool.AddTask() may start a
1720     job while we're still doing our work.
1721
1722     """
1723     logging.info("Inspecting job queue")
1724
1725     restartjobs = []
1726
1727     all_job_ids = self._GetJobIDsUnlocked()
1728     jobs_count = len(all_job_ids)
1729     lastinfo = time.time()
1730     for idx, job_id in enumerate(all_job_ids):
1731       # Give an update every 1000 jobs or 10 seconds
1732       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1733           idx == (jobs_count - 1)):
1734         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1735                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1736         lastinfo = time.time()
1737
1738       job = self._LoadJobUnlocked(job_id)
1739
1740       # a failure in loading the job can cause 'None' to be returned
1741       if job is None:
1742         continue
1743
1744       status = job.CalcStatus()
1745
1746       if status == constants.JOB_STATUS_QUEUED:
1747         restartjobs.append(job)
1748
1749       elif status in (constants.JOB_STATUS_RUNNING,
1750                       constants.JOB_STATUS_WAITING,
1751                       constants.JOB_STATUS_CANCELING):
1752         logging.warning("Unfinished job %s found: %s", job.id, job)
1753
1754         if status == constants.JOB_STATUS_WAITING:
1755           # Restart job
1756           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1757           restartjobs.append(job)
1758         else:
1759           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1760                                 "Unclean master daemon shutdown")
1761           job.Finalize()
1762
1763         self.UpdateJobUnlocked(job)
1764
1765     if restartjobs:
1766       logging.info("Restarting %s jobs", len(restartjobs))
1767       self._EnqueueJobsUnlocked(restartjobs)
1768
1769     logging.info("Job queue inspection finished")
1770
1771   def _GetRpc(self, address_list):
1772     """Gets RPC runner with context.
1773
1774     """
1775     return rpc.JobQueueRunner(self.context, address_list)
1776
1777   @locking.ssynchronized(_LOCK)
1778   @_RequireOpenQueue
1779   def AddNode(self, node):
1780     """Register a new node with the queue.
1781
1782     @type node: L{objects.Node}
1783     @param node: the node object to be added
1784
1785     """
1786     node_name = node.name
1787     assert node_name != self._my_hostname
1788
1789     # Clean queue directory on added node
1790     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1791     msg = result.fail_msg
1792     if msg:
1793       logging.warning("Cannot cleanup queue directory on node %s: %s",
1794                       node_name, msg)
1795
1796     if not node.master_candidate:
1797       # remove if existing, ignoring errors
1798       self._nodes.pop(node_name, None)
1799       # and skip the replication of the job ids
1800       return
1801
1802     # Upload the whole queue excluding archived jobs
1803     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1804
1805     # Upload current serial file
1806     files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1807
1808     # Static address list
1809     addrs = [node.primary_ip]
1810
1811     for file_name in files:
1812       # Read file content
1813       content = utils.ReadFile(file_name)
1814
1815       result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1816                              file_name, content)
1817       msg = result[node_name].fail_msg
1818       if msg:
1819         logging.error("Failed to upload file %s to node %s: %s",
1820                       file_name, node_name, msg)
1821
1822     # Set queue drained flag
1823     result = \
1824       self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1825                                                        self._drained)
1826     msg = result[node_name].fail_msg
1827     if msg:
1828       logging.error("Failed to set queue drained flag on node %s: %s",
1829                     node_name, msg)
1830
1831     self._nodes[node_name] = node.primary_ip
1832
1833   @locking.ssynchronized(_LOCK)
1834   @_RequireOpenQueue
1835   def RemoveNode(self, node_name):
1836     """Callback called when removing nodes from the cluster.
1837
1838     @type node_name: str
1839     @param node_name: the name of the node to remove
1840
1841     """
1842     self._nodes.pop(node_name, None)
1843
1844   @staticmethod
1845   def _CheckRpcResult(result, nodes, failmsg):
1846     """Verifies the status of an RPC call.
1847
1848     Since we aim to keep consistency should this node (the current
1849     master) fail, we will log errors if our rpc fail, and especially
1850     log the case when more than half of the nodes fails.
1851
1852     @param result: the data as returned from the rpc call
1853     @type nodes: list
1854     @param nodes: the list of nodes we made the call to
1855     @type failmsg: str
1856     @param failmsg: the identifier to be used for logging
1857
1858     """
1859     failed = []
1860     success = []
1861
1862     for node in nodes:
1863       msg = result[node].fail_msg
1864       if msg:
1865         failed.append(node)
1866         logging.error("RPC call %s (%s) failed on node %s: %s",
1867                       result[node].call, failmsg, node, msg)
1868       else:
1869         success.append(node)
1870
1871     # +1 for the master node
1872     if (len(success) + 1) < len(failed):
1873       # TODO: Handle failing nodes
1874       logging.error("More than half of the nodes failed")
1875
1876   def _GetNodeIp(self):
1877     """Helper for returning the node name/ip list.
1878
1879     @rtype: (list, list)
1880     @return: a tuple of two lists, the first one with the node
1881         names and the second one with the node addresses
1882
1883     """
1884     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1885     name_list = self._nodes.keys()
1886     addr_list = [self._nodes[name] for name in name_list]
1887     return name_list, addr_list
1888
1889   def _UpdateJobQueueFile(self, file_name, data, replicate):
1890     """Writes a file locally and then replicates it to all nodes.
1891
1892     This function will replace the contents of a file on the local
1893     node and then replicate it to all the other nodes we have.
1894
1895     @type file_name: str
1896     @param file_name: the path of the file to be replicated
1897     @type data: str
1898     @param data: the new contents of the file
1899     @type replicate: boolean
1900     @param replicate: whether to spread the changes to the remote nodes
1901
1902     """
1903     getents = runtime.GetEnts()
1904     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1905                     gid=getents.daemons_gid,
1906                     mode=constants.JOB_QUEUE_FILES_PERMS)
1907
1908     if replicate:
1909       names, addrs = self._GetNodeIp()
1910       result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1911       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1912
1913   def _RenameFilesUnlocked(self, rename):
1914     """Renames a file locally and then replicate the change.
1915
1916     This function will rename a file in the local queue directory
1917     and then replicate this rename to all the other nodes we have.
1918
1919     @type rename: list of (old, new)
1920     @param rename: List containing tuples mapping old to new names
1921
1922     """
1923     # Rename them locally
1924     for old, new in rename:
1925       utils.RenameFile(old, new, mkdir=True)
1926
1927     # ... and on all nodes
1928     names, addrs = self._GetNodeIp()
1929     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1930     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1931
1932   def _NewSerialsUnlocked(self, count):
1933     """Generates a new job identifier.
1934
1935     Job identifiers are unique during the lifetime of a cluster.
1936
1937     @type count: integer
1938     @param count: how many serials to return
1939     @rtype: list of int
1940     @return: a list of job identifiers.
1941
1942     """
1943     assert ht.TNonNegativeInt(count)
1944
1945     # New number
1946     serial = self._last_serial + count
1947
1948     # Write to file
1949     self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1950                              "%s\n" % serial, True)
1951
1952     result = [jstore.FormatJobID(v)
1953               for v in range(self._last_serial + 1, serial + 1)]
1954
1955     # Keep it only if we were able to write the file
1956     self._last_serial = serial
1957
1958     assert len(result) == count
1959
1960     return result
1961
1962   @staticmethod
1963   def _GetJobPath(job_id):
1964     """Returns the job file for a given job id.
1965
1966     @type job_id: str
1967     @param job_id: the job identifier
1968     @rtype: str
1969     @return: the path to the job file
1970
1971     """
1972     return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1973
1974   @staticmethod
1975   def _GetArchivedJobPath(job_id):
1976     """Returns the archived job file for a give job id.
1977
1978     @type job_id: str
1979     @param job_id: the job identifier
1980     @rtype: str
1981     @return: the path to the archived job file
1982
1983     """
1984     return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1985                           jstore.GetArchiveDirectory(job_id),
1986                           "job-%s" % job_id)
1987
1988   @staticmethod
1989   def _DetermineJobDirectories(archived):
1990     """Build list of directories containing job files.
1991
1992     @type archived: bool
1993     @param archived: Whether to include directories for archived jobs
1994     @rtype: list
1995
1996     """
1997     result = [pathutils.QUEUE_DIR]
1998
1999     if archived:
2000       archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
2001       result.extend(map(compat.partial(utils.PathJoin, archive_path),
2002                         utils.ListVisibleFiles(archive_path)))
2003
2004     return result
2005
2006   @classmethod
2007   def _GetJobIDsUnlocked(cls, sort=True, archived=False):
2008     """Return all known job IDs.
2009
2010     The method only looks at disk because it's a requirement that all
2011     jobs are present on disk (so in the _memcache we don't have any
2012     extra IDs).
2013
2014     @type sort: boolean
2015     @param sort: perform sorting on the returned job ids
2016     @rtype: list
2017     @return: the list of job IDs
2018
2019     """
2020     jlist = []
2021
2022     for path in cls._DetermineJobDirectories(archived):
2023       for filename in utils.ListVisibleFiles(path):
2024         m = constants.JOB_FILE_RE.match(filename)
2025         if m:
2026           jlist.append(int(m.group(1)))
2027
2028     if sort:
2029       jlist.sort()
2030     return jlist
2031
2032   def _LoadJobUnlocked(self, job_id):
2033     """Loads a job from the disk or memory.
2034
2035     Given a job id, this will return the cached job object if
2036     existing, or try to load the job from the disk. If loading from
2037     disk, it will also add the job to the cache.
2038
2039     @type job_id: int
2040     @param job_id: the job id
2041     @rtype: L{_QueuedJob} or None
2042     @return: either None or the job object
2043
2044     """
2045     job = self._memcache.get(job_id, None)
2046     if job:
2047       logging.debug("Found job %s in memcache", job_id)
2048       assert job.writable, "Found read-only job in memcache"
2049       return job
2050
2051     try:
2052       job = self._LoadJobFromDisk(job_id, False)
2053       if job is None:
2054         return job
2055     except errors.JobFileCorrupted:
2056       old_path = self._GetJobPath(job_id)
2057       new_path = self._GetArchivedJobPath(job_id)
2058       if old_path == new_path:
2059         # job already archived (future case)
2060         logging.exception("Can't parse job %s", job_id)
2061       else:
2062         # non-archived case
2063         logging.exception("Can't parse job %s, will archive.", job_id)
2064         self._RenameFilesUnlocked([(old_path, new_path)])
2065       return None
2066
2067     assert job.writable, "Job just loaded is not writable"
2068
2069     self._memcache[job_id] = job
2070     logging.debug("Added job %s to the cache", job_id)
2071     return job
2072
2073   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2074     """Load the given job file from disk.
2075
2076     Given a job file, read, load and restore it in a _QueuedJob format.
2077
2078     @type job_id: int
2079     @param job_id: job identifier
2080     @type try_archived: bool
2081     @param try_archived: Whether to try loading an archived job
2082     @rtype: L{_QueuedJob} or None
2083     @return: either None or the job object
2084
2085     """
2086     path_functions = [(self._GetJobPath, False)]
2087
2088     if try_archived:
2089       path_functions.append((self._GetArchivedJobPath, True))
2090
2091     raw_data = None
2092     archived = None
2093
2094     for (fn, archived) in path_functions:
2095       filepath = fn(job_id)
2096       logging.debug("Loading job from %s", filepath)
2097       try:
2098         raw_data = utils.ReadFile(filepath)
2099       except EnvironmentError, err:
2100         if err.errno != errno.ENOENT:
2101           raise
2102       else:
2103         break
2104
2105     if not raw_data:
2106       return None
2107
2108     if writable is None:
2109       writable = not archived
2110
2111     try:
2112       data = serializer.LoadJson(raw_data)
2113       job = _QueuedJob.Restore(self, data, writable, archived)
2114     except Exception, err: # pylint: disable=W0703
2115       raise errors.JobFileCorrupted(err)
2116
2117     return job
2118
2119   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2120     """Load the given job file from disk.
2121
2122     Given a job file, read, load and restore it in a _QueuedJob format.
2123     In case of error reading the job, it gets returned as None, and the
2124     exception is logged.
2125
2126     @type job_id: int
2127     @param job_id: job identifier
2128     @type try_archived: bool
2129     @param try_archived: Whether to try loading an archived job
2130     @rtype: L{_QueuedJob} or None
2131     @return: either None or the job object
2132
2133     """
2134     try:
2135       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2136     except (errors.JobFileCorrupted, EnvironmentError):
2137       logging.exception("Can't load/parse job %s", job_id)
2138       return None
2139
2140   def _UpdateQueueSizeUnlocked(self):
2141     """Update the queue size.
2142
2143     """
2144     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2145
2146   @locking.ssynchronized(_LOCK)
2147   @_RequireOpenQueue
2148   def SetDrainFlag(self, drain_flag):
2149     """Sets the drain flag for the queue.
2150
2151     @type drain_flag: boolean
2152     @param drain_flag: Whether to set or unset the drain flag
2153
2154     """
2155     # Change flag locally
2156     jstore.SetDrainFlag(drain_flag)
2157
2158     self._drained = drain_flag
2159
2160     # ... and on all nodes
2161     (names, addrs) = self._GetNodeIp()
2162     result = \
2163       self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2164     self._CheckRpcResult(result, self._nodes,
2165                          "Setting queue drain flag to %s" % drain_flag)
2166
2167     return True
2168
2169   @_RequireOpenQueue
2170   def _SubmitJobUnlocked(self, job_id, ops):
2171     """Create and store a new job.
2172
2173     This enters the job into our job queue and also puts it on the new
2174     queue, in order for it to be picked up by the queue processors.
2175
2176     @type job_id: job ID
2177     @param job_id: the job ID for the new job
2178     @type ops: list
2179     @param ops: The list of OpCodes that will become the new job.
2180     @rtype: L{_QueuedJob}
2181     @return: the job object to be queued
2182     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2183     @raise errors.GenericError: If an opcode is not valid
2184
2185     """
2186     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2187       raise errors.JobQueueFull()
2188
2189     job = _QueuedJob(self, job_id, ops, True)
2190
2191     for idx, op in enumerate(job.ops):
2192       # Check priority
2193       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2194         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2195         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2196                                   " are %s" % (idx, op.priority, allowed))
2197
2198       # Check job dependencies
2199       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2200       if not opcodes.TNoRelativeJobDependencies(dependencies):
2201         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2202                                   " match %s: %s" %
2203                                   (idx, opcodes.TNoRelativeJobDependencies,
2204                                    dependencies))
2205
2206     # Write to disk
2207     self.UpdateJobUnlocked(job)
2208
2209     self._queue_size += 1
2210
2211     logging.debug("Adding new job %s to the cache", job_id)
2212     self._memcache[job_id] = job
2213
2214     return job
2215
2216   @locking.ssynchronized(_LOCK)
2217   @_RequireOpenQueue
2218   @_RequireNonDrainedQueue
2219   def SubmitJob(self, ops):
2220     """Create and store a new job.
2221
2222     @see: L{_SubmitJobUnlocked}
2223
2224     """
2225     (job_id, ) = self._NewSerialsUnlocked(1)
2226     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2227     return job_id
2228
2229   @locking.ssynchronized(_LOCK)
2230   @_RequireOpenQueue
2231   @_RequireNonDrainedQueue
2232   def SubmitManyJobs(self, jobs):
2233     """Create and store multiple jobs.
2234
2235     @see: L{_SubmitJobUnlocked}
2236
2237     """
2238     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2239
2240     (results, added_jobs) = \
2241       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2242
2243     self._EnqueueJobsUnlocked(added_jobs)
2244
2245     return results
2246
2247   @staticmethod
2248   def _FormatSubmitError(msg, ops):
2249     """Formats errors which occurred while submitting a job.
2250
2251     """
2252     return ("%s; opcodes %s" %
2253             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2254
2255   @staticmethod
2256   def _ResolveJobDependencies(resolve_fn, deps):
2257     """Resolves relative job IDs in dependencies.
2258
2259     @type resolve_fn: callable
2260     @param resolve_fn: Function to resolve a relative job ID
2261     @type deps: list
2262     @param deps: Dependencies
2263     @rtype: tuple; (boolean, string or list)
2264     @return: If successful (first tuple item), the returned list contains
2265       resolved job IDs along with the requested status; if not successful,
2266       the second element is an error message
2267
2268     """
2269     result = []
2270
2271     for (dep_job_id, dep_status) in deps:
2272       if ht.TRelativeJobId(dep_job_id):
2273         assert ht.TInt(dep_job_id) and dep_job_id < 0
2274         try:
2275           job_id = resolve_fn(dep_job_id)
2276         except IndexError:
2277           # Abort
2278           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2279       else:
2280         job_id = dep_job_id
2281
2282       result.append((job_id, dep_status))
2283
2284     return (True, result)
2285
2286   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2287     """Create and store multiple jobs.
2288
2289     @see: L{_SubmitJobUnlocked}
2290
2291     """
2292     results = []
2293     added_jobs = []
2294
2295     def resolve_fn(job_idx, reljobid):
2296       assert reljobid < 0
2297       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2298
2299     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2300       for op in ops:
2301         if getattr(op, opcodes.DEPEND_ATTR, None):
2302           (status, data) = \
2303             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2304                                          op.depends)
2305           if not status:
2306             # Abort resolving dependencies
2307             assert ht.TNonEmptyString(data), "No error message"
2308             break
2309           # Use resolved dependencies
2310           op.depends = data
2311       else:
2312         try:
2313           job = self._SubmitJobUnlocked(job_id, ops)
2314         except errors.GenericError, err:
2315           status = False
2316           data = self._FormatSubmitError(str(err), ops)
2317         else:
2318           status = True
2319           data = job_id
2320           added_jobs.append(job)
2321
2322       results.append((status, data))
2323
2324     return (results, added_jobs)
2325
2326   @locking.ssynchronized(_LOCK)
2327   def _EnqueueJobs(self, jobs):
2328     """Helper function to add jobs to worker pool's queue.
2329
2330     @type jobs: list
2331     @param jobs: List of all jobs
2332
2333     """
2334     return self._EnqueueJobsUnlocked(jobs)
2335
2336   def _EnqueueJobsUnlocked(self, jobs):
2337     """Helper function to add jobs to worker pool's queue.
2338
2339     @type jobs: list
2340     @param jobs: List of all jobs
2341
2342     """
2343     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2344     self._wpool.AddManyTasks([(job, ) for job in jobs],
2345                              priority=[job.CalcPriority() for job in jobs],
2346                              task_id=map(_GetIdAttr, jobs))
2347
2348   def _GetJobStatusForDependencies(self, job_id):
2349     """Gets the status of a job for dependencies.
2350
2351     @type job_id: int
2352     @param job_id: Job ID
2353     @raise errors.JobLost: If job can't be found
2354
2355     """
2356     # Not using in-memory cache as doing so would require an exclusive lock
2357
2358     # Try to load from disk
2359     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2360
2361     assert not job.writable, "Got writable job" # pylint: disable=E1101
2362
2363     if job:
2364       return job.CalcStatus()
2365
2366     raise errors.JobLost("Job %s not found" % job_id)
2367
2368   @_RequireOpenQueue
2369   def UpdateJobUnlocked(self, job, replicate=True):
2370     """Update a job's on disk storage.
2371
2372     After a job has been modified, this function needs to be called in
2373     order to write the changes to disk and replicate them to the other
2374     nodes.
2375
2376     @type job: L{_QueuedJob}
2377     @param job: the changed job
2378     @type replicate: boolean
2379     @param replicate: whether to replicate the change to remote nodes
2380
2381     """
2382     if __debug__:
2383       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2384       assert (finalized ^ (job.end_timestamp is None))
2385       assert job.writable, "Can't update read-only job"
2386       assert not job.archived, "Can't update archived job"
2387
2388     filename = self._GetJobPath(job.id)
2389     data = serializer.DumpJson(job.Serialize())
2390     logging.debug("Writing job %s to %s", job.id, filename)
2391     self._UpdateJobQueueFile(filename, data, replicate)
2392
2393   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2394                         timeout):
2395     """Waits for changes in a job.
2396
2397     @type job_id: int
2398     @param job_id: Job identifier
2399     @type fields: list of strings
2400     @param fields: Which fields to check for changes
2401     @type prev_job_info: list or None
2402     @param prev_job_info: Last job information returned
2403     @type prev_log_serial: int
2404     @param prev_log_serial: Last job message serial number
2405     @type timeout: float
2406     @param timeout: maximum time to wait in seconds
2407     @rtype: tuple (job info, log entries)
2408     @return: a tuple of the job information as required via
2409         the fields parameter, and the log entries as a list
2410
2411         if the job has not changed and the timeout has expired,
2412         we instead return a special value,
2413         L{constants.JOB_NOTCHANGED}, which should be interpreted
2414         as such by the clients
2415
2416     """
2417     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2418                              writable=False)
2419
2420     helper = _WaitForJobChangesHelper()
2421
2422     return helper(self._GetJobPath(job_id), load_fn,
2423                   fields, prev_job_info, prev_log_serial, timeout)
2424
2425   @locking.ssynchronized(_LOCK)
2426   @_RequireOpenQueue
2427   def CancelJob(self, job_id):
2428     """Cancels a job.
2429
2430     This will only succeed if the job has not started yet.
2431
2432     @type job_id: int
2433     @param job_id: job ID of job to be cancelled.
2434
2435     """
2436     logging.info("Cancelling job %s", job_id)
2437
2438     return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2439
2440   @locking.ssynchronized(_LOCK)
2441   @_RequireOpenQueue
2442   def ChangeJobPriority(self, job_id, priority):
2443     """Changes a job's priority.
2444
2445     @type job_id: int
2446     @param job_id: ID of the job whose priority should be changed
2447     @type priority: int
2448     @param priority: New priority
2449
2450     """
2451     logging.info("Changing priority of job %s to %s", job_id, priority)
2452
2453     if priority not in constants.OP_PRIO_SUBMIT_VALID:
2454       allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2455       raise errors.GenericError("Invalid priority %s, allowed are %s" %
2456                                 (priority, allowed))
2457
2458     def fn(job):
2459       (success, msg) = job.ChangePriority(priority)
2460
2461       if success:
2462         try:
2463           self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2464         except workerpool.NoSuchTask:
2465           logging.debug("Job %s is not in workerpool at this time", job.id)
2466
2467       return (success, msg)
2468
2469     return self._ModifyJobUnlocked(job_id, fn)
2470
2471   def _ModifyJobUnlocked(self, job_id, mod_fn):
2472     """Modifies a job.
2473
2474     @type job_id: int
2475     @param job_id: Job ID
2476     @type mod_fn: callable
2477     @param mod_fn: Modifying function, receiving job object as parameter,
2478       returning tuple of (status boolean, message string)
2479
2480     """
2481     job = self._LoadJobUnlocked(job_id)
2482     if not job:
2483       logging.debug("Job %s not found", job_id)
2484       return (False, "Job %s not found" % job_id)
2485
2486     assert job.writable, "Can't modify read-only job"
2487     assert not job.archived, "Can't modify archived job"
2488
2489     (success, msg) = mod_fn(job)
2490
2491     if success:
2492       # If the job was finalized (e.g. cancelled), this is the final write
2493       # allowed. The job can be archived anytime.
2494       self.UpdateJobUnlocked(job)
2495
2496     return (success, msg)
2497
2498   @_RequireOpenQueue
2499   def _ArchiveJobsUnlocked(self, jobs):
2500     """Archives jobs.
2501
2502     @type jobs: list of L{_QueuedJob}
2503     @param jobs: Job objects
2504     @rtype: int
2505     @return: Number of archived jobs
2506
2507     """
2508     archive_jobs = []
2509     rename_files = []
2510     for job in jobs:
2511       assert job.writable, "Can't archive read-only job"
2512       assert not job.archived, "Can't cancel archived job"
2513
2514       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2515         logging.debug("Job %s is not yet done", job.id)
2516         continue
2517
2518       archive_jobs.append(job)
2519
2520       old = self._GetJobPath(job.id)
2521       new = self._GetArchivedJobPath(job.id)
2522       rename_files.append((old, new))
2523
2524     # TODO: What if 1..n files fail to rename?
2525     self._RenameFilesUnlocked(rename_files)
2526
2527     logging.debug("Successfully archived job(s) %s",
2528                   utils.CommaJoin(job.id for job in archive_jobs))
2529
2530     # Since we haven't quite checked, above, if we succeeded or failed renaming
2531     # the files, we update the cached queue size from the filesystem. When we
2532     # get around to fix the TODO: above, we can use the number of actually
2533     # archived jobs to fix this.
2534     self._UpdateQueueSizeUnlocked()
2535     return len(archive_jobs)
2536
2537   @locking.ssynchronized(_LOCK)
2538   @_RequireOpenQueue
2539   def ArchiveJob(self, job_id):
2540     """Archives a job.
2541
2542     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2543
2544     @type job_id: int
2545     @param job_id: Job ID of job to be archived.
2546     @rtype: bool
2547     @return: Whether job was archived
2548
2549     """
2550     logging.info("Archiving job %s", job_id)
2551
2552     job = self._LoadJobUnlocked(job_id)
2553     if not job:
2554       logging.debug("Job %s not found", job_id)
2555       return False
2556
2557     return self._ArchiveJobsUnlocked([job]) == 1
2558
2559   @locking.ssynchronized(_LOCK)
2560   @_RequireOpenQueue
2561   def AutoArchiveJobs(self, age, timeout):
2562     """Archives all jobs based on age.
2563
2564     The method will archive all jobs which are older than the age
2565     parameter. For jobs that don't have an end timestamp, the start
2566     timestamp will be considered. The special '-1' age will cause
2567     archival of all jobs (that are not running or queued).
2568
2569     @type age: int
2570     @param age: the minimum age in seconds
2571
2572     """
2573     logging.info("Archiving jobs with age more than %s seconds", age)
2574
2575     now = time.time()
2576     end_time = now + timeout
2577     archived_count = 0
2578     last_touched = 0
2579
2580     all_job_ids = self._GetJobIDsUnlocked()
2581     pending = []
2582     for idx, job_id in enumerate(all_job_ids):
2583       last_touched = idx + 1
2584
2585       # Not optimal because jobs could be pending
2586       # TODO: Measure average duration for job archival and take number of
2587       # pending jobs into account.
2588       if time.time() > end_time:
2589         break
2590
2591       # Returns None if the job failed to load
2592       job = self._LoadJobUnlocked(job_id)
2593       if job:
2594         if job.end_timestamp is None:
2595           if job.start_timestamp is None:
2596             job_age = job.received_timestamp
2597           else:
2598             job_age = job.start_timestamp
2599         else:
2600           job_age = job.end_timestamp
2601
2602         if age == -1 or now - job_age[0] > age:
2603           pending.append(job)
2604
2605           # Archive 10 jobs at a time
2606           if len(pending) >= 10:
2607             archived_count += self._ArchiveJobsUnlocked(pending)
2608             pending = []
2609
2610     if pending:
2611       archived_count += self._ArchiveJobsUnlocked(pending)
2612
2613     return (archived_count, len(all_job_ids) - last_touched)
2614
2615   def _Query(self, fields, qfilter):
2616     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2617                        namefield="id")
2618
2619     # Archived jobs are only looked at if the "archived" field is referenced
2620     # either as a requested field or in the filter. By default archived jobs
2621     # are ignored.
2622     include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2623
2624     job_ids = qobj.RequestedNames()
2625
2626     list_all = (job_ids is None)
2627
2628     if list_all:
2629       # Since files are added to/removed from the queue atomically, there's no
2630       # risk of getting the job ids in an inconsistent state.
2631       job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2632
2633     jobs = []
2634
2635     for job_id in job_ids:
2636       job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2637       if job is not None or not list_all:
2638         jobs.append((job_id, job))
2639
2640     return (qobj, jobs, list_all)
2641
2642   def QueryJobs(self, fields, qfilter):
2643     """Returns a list of jobs in queue.
2644
2645     @type fields: sequence
2646     @param fields: List of wanted fields
2647     @type qfilter: None or query2 filter (list)
2648     @param qfilter: Query filter
2649
2650     """
2651     (qobj, ctx, _) = self._Query(fields, qfilter)
2652
2653     return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2654
2655   def OldStyleQueryJobs(self, job_ids, fields):
2656     """Returns a list of jobs in queue.
2657
2658     @type job_ids: list
2659     @param job_ids: sequence of job identifiers or None for all
2660     @type fields: list
2661     @param fields: names of fields to return
2662     @rtype: list
2663     @return: list one element per job, each element being list with
2664         the requested fields
2665
2666     """
2667     # backwards compat:
2668     job_ids = [int(jid) for jid in job_ids]
2669     qfilter = qlang.MakeSimpleFilter("id", job_ids)
2670
2671     (qobj, ctx, _) = self._Query(fields, qfilter)
2672
2673     return qobj.OldStyleQuery(ctx, sort_by_name=False)
2674
2675   @locking.ssynchronized(_LOCK)
2676   def PrepareShutdown(self):
2677     """Prepare to stop the job queue.
2678
2679     Disables execution of jobs in the workerpool and returns whether there are
2680     any jobs currently running. If the latter is the case, the job queue is not
2681     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2682     be called without interfering with any job. Queued and unfinished jobs will
2683     be resumed next time.
2684
2685     Once this function has been called no new job submissions will be accepted
2686     (see L{_RequireNonDrainedQueue}).
2687
2688     @rtype: bool
2689     @return: Whether there are any running jobs
2690
2691     """
2692     if self._accepting_jobs:
2693       self._accepting_jobs = False
2694
2695       # Tell worker pool to stop processing pending tasks
2696       self._wpool.SetActive(False)
2697
2698     return self._wpool.HasRunningTasks()
2699
2700   def AcceptingJobsUnlocked(self):
2701     """Returns whether jobs are accepted.
2702
2703     Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2704     queue is shutting down.
2705
2706     @rtype: bool
2707
2708     """
2709     return self._accepting_jobs
2710
2711   @locking.ssynchronized(_LOCK)
2712   @_RequireOpenQueue
2713   def Shutdown(self):
2714     """Stops the job queue.
2715
2716     This shutdowns all the worker threads an closes the queue.
2717
2718     """
2719     self._wpool.TerminateWorkers()
2720
2721     self._queue_filelock.Close()
2722     self._queue_filelock = None