Fix job queue directory permission problems
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38 import operator
39
40 try:
41   # pylint: disable=E0611
42   from pyinotify import pyinotify
43 except ImportError:
44   import pyinotify
45
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import errors
53 from ganeti import mcpu
54 from ganeti import utils
55 from ganeti import jstore
56 from ganeti import rpc
57 from ganeti import runtime
58 from ganeti import netutils
59 from ganeti import compat
60 from ganeti import ht
61 from ganeti import query
62 from ganeti import qlang
63 from ganeti import pathutils
64 from ganeti import vcluster
65
66
67 JOBQUEUE_THREADS = 25
68
69 # member lock names to be passed to @ssynchronized decorator
70 _LOCK = "_lock"
71 _QUEUE = "_queue"
72
73 #: Retrieves "id" attribute
74 _GetIdAttr = operator.attrgetter("id")
75
76
77 class CancelJob(Exception):
78   """Special exception to cancel a job.
79
80   """
81
82
83 class QueueShutdown(Exception):
84   """Special exception to abort a job when the job queue is shutting down.
85
86   """
87
88
89 def TimeStampNow():
90   """Returns the current timestamp.
91
92   @rtype: tuple
93   @return: the current time in the (seconds, microseconds) format
94
95   """
96   return utils.SplitTime(time.time())
97
98
99 def _CallJqUpdate(runner, names, file_name, content):
100   """Updates job queue file after virtualizing filename.
101
102   """
103   virt_file_name = vcluster.MakeVirtualPath(file_name)
104   return runner.call_jobqueue_update(names, virt_file_name, content)
105
106
107 class _SimpleJobQuery:
108   """Wrapper for job queries.
109
110   Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111
112   """
113   def __init__(self, fields):
114     """Initializes this class.
115
116     """
117     self._query = query.Query(query.JOB_FIELDS, fields)
118
119   def __call__(self, job):
120     """Executes a job query using cached field list.
121
122     """
123     return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124
125
126 class _QueuedOpCode(object):
127   """Encapsulates an opcode object.
128
129   @ivar log: holds the execution log and consists of tuples
130   of the form C{(log_serial, timestamp, level, message)}
131   @ivar input: the OpCode we encapsulate
132   @ivar status: the current status
133   @ivar result: the result of the LU execution
134   @ivar start_timestamp: timestamp for the start of the execution
135   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136   @ivar stop_timestamp: timestamp for the end of the execution
137
138   """
139   __slots__ = ["input", "status", "result", "log", "priority",
140                "start_timestamp", "exec_timestamp", "end_timestamp",
141                "__weakref__"]
142
143   def __init__(self, op):
144     """Initializes instances of this class.
145
146     @type op: L{opcodes.OpCode}
147     @param op: the opcode we encapsulate
148
149     """
150     self.input = op
151     self.status = constants.OP_STATUS_QUEUED
152     self.result = None
153     self.log = []
154     self.start_timestamp = None
155     self.exec_timestamp = None
156     self.end_timestamp = None
157
158     # Get initial priority (it might change during the lifetime of this opcode)
159     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160
161   @classmethod
162   def Restore(cls, state):
163     """Restore the _QueuedOpCode from the serialized form.
164
165     @type state: dict
166     @param state: the serialized state
167     @rtype: _QueuedOpCode
168     @return: a new _QueuedOpCode instance
169
170     """
171     obj = _QueuedOpCode.__new__(cls)
172     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173     obj.status = state["status"]
174     obj.result = state["result"]
175     obj.log = state["log"]
176     obj.start_timestamp = state.get("start_timestamp", None)
177     obj.exec_timestamp = state.get("exec_timestamp", None)
178     obj.end_timestamp = state.get("end_timestamp", None)
179     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180     return obj
181
182   def Serialize(self):
183     """Serializes this _QueuedOpCode.
184
185     @rtype: dict
186     @return: the dictionary holding the serialized state
187
188     """
189     return {
190       "input": self.input.__getstate__(),
191       "status": self.status,
192       "result": self.result,
193       "log": self.log,
194       "start_timestamp": self.start_timestamp,
195       "exec_timestamp": self.exec_timestamp,
196       "end_timestamp": self.end_timestamp,
197       "priority": self.priority,
198       }
199
200
201 class _QueuedJob(object):
202   """In-memory job representation.
203
204   This is what we use to track the user-submitted jobs. Locking must
205   be taken care of by users of this class.
206
207   @type queue: L{JobQueue}
208   @ivar queue: the parent queue
209   @ivar id: the job ID
210   @type ops: list
211   @ivar ops: the list of _QueuedOpCode that constitute the job
212   @type log_serial: int
213   @ivar log_serial: holds the index for the next log entry
214   @ivar received_timestamp: the timestamp for when the job was received
215   @ivar start_timestmap: the timestamp for start of execution
216   @ivar end_timestamp: the timestamp for end of execution
217   @ivar writable: Whether the job is allowed to be modified
218
219   """
220   # pylint: disable=W0212
221   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222                "received_timestamp", "start_timestamp", "end_timestamp",
223                "__weakref__", "processor_lock", "writable", "archived"]
224
225   def __init__(self, queue, job_id, ops, writable):
226     """Constructor for the _QueuedJob.
227
228     @type queue: L{JobQueue}
229     @param queue: our parent queue
230     @type job_id: job_id
231     @param job_id: our job id
232     @type ops: list
233     @param ops: the list of opcodes we hold, which will be encapsulated
234         in _QueuedOpCodes
235     @type writable: bool
236     @param writable: Whether job can be modified
237
238     """
239     if not ops:
240       raise errors.GenericError("A job needs at least one opcode")
241
242     self.queue = queue
243     self.id = int(job_id)
244     self.ops = [_QueuedOpCode(op) for op in ops]
245     self.log_serial = 0
246     self.received_timestamp = TimeStampNow()
247     self.start_timestamp = None
248     self.end_timestamp = None
249     self.archived = False
250
251     self._InitInMemory(self, writable)
252
253     assert not self.archived, "New jobs can not be marked as archived"
254
255   @staticmethod
256   def _InitInMemory(obj, writable):
257     """Initializes in-memory variables.
258
259     """
260     obj.writable = writable
261     obj.ops_iter = None
262     obj.cur_opctx = None
263
264     # Read-only jobs are not processed and therefore don't need a lock
265     if writable:
266       obj.processor_lock = threading.Lock()
267     else:
268       obj.processor_lock = None
269
270   def __repr__(self):
271     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
272               "id=%s" % self.id,
273               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
274
275     return "<%s at %#x>" % (" ".join(status), id(self))
276
277   @classmethod
278   def Restore(cls, queue, state, writable, archived):
279     """Restore a _QueuedJob from serialized state:
280
281     @type queue: L{JobQueue}
282     @param queue: to which queue the restored job belongs
283     @type state: dict
284     @param state: the serialized state
285     @type writable: bool
286     @param writable: Whether job can be modified
287     @type archived: bool
288     @param archived: Whether job was already archived
289     @rtype: _JobQueue
290     @return: the restored _JobQueue instance
291
292     """
293     obj = _QueuedJob.__new__(cls)
294     obj.queue = queue
295     obj.id = int(state["id"])
296     obj.received_timestamp = state.get("received_timestamp", None)
297     obj.start_timestamp = state.get("start_timestamp", None)
298     obj.end_timestamp = state.get("end_timestamp", None)
299     obj.archived = archived
300
301     obj.ops = []
302     obj.log_serial = 0
303     for op_state in state["ops"]:
304       op = _QueuedOpCode.Restore(op_state)
305       for log_entry in op.log:
306         obj.log_serial = max(obj.log_serial, log_entry[0])
307       obj.ops.append(op)
308
309     cls._InitInMemory(obj, writable)
310
311     return obj
312
313   def Serialize(self):
314     """Serialize the _JobQueue instance.
315
316     @rtype: dict
317     @return: the serialized state
318
319     """
320     return {
321       "id": self.id,
322       "ops": [op.Serialize() for op in self.ops],
323       "start_timestamp": self.start_timestamp,
324       "end_timestamp": self.end_timestamp,
325       "received_timestamp": self.received_timestamp,
326       }
327
328   def CalcStatus(self):
329     """Compute the status of this job.
330
331     This function iterates over all the _QueuedOpCodes in the job and
332     based on their status, computes the job status.
333
334     The algorithm is:
335       - if we find a cancelled, or finished with error, the job
336         status will be the same
337       - otherwise, the last opcode with the status one of:
338           - waitlock
339           - canceling
340           - running
341
342         will determine the job status
343
344       - otherwise, it means either all opcodes are queued, or success,
345         and the job status will be the same
346
347     @return: the job status
348
349     """
350     status = constants.JOB_STATUS_QUEUED
351
352     all_success = True
353     for op in self.ops:
354       if op.status == constants.OP_STATUS_SUCCESS:
355         continue
356
357       all_success = False
358
359       if op.status == constants.OP_STATUS_QUEUED:
360         pass
361       elif op.status == constants.OP_STATUS_WAITING:
362         status = constants.JOB_STATUS_WAITING
363       elif op.status == constants.OP_STATUS_RUNNING:
364         status = constants.JOB_STATUS_RUNNING
365       elif op.status == constants.OP_STATUS_CANCELING:
366         status = constants.JOB_STATUS_CANCELING
367         break
368       elif op.status == constants.OP_STATUS_ERROR:
369         status = constants.JOB_STATUS_ERROR
370         # The whole job fails if one opcode failed
371         break
372       elif op.status == constants.OP_STATUS_CANCELED:
373         status = constants.OP_STATUS_CANCELED
374         break
375
376     if all_success:
377       status = constants.JOB_STATUS_SUCCESS
378
379     return status
380
381   def CalcPriority(self):
382     """Gets the current priority for this job.
383
384     Only unfinished opcodes are considered. When all are done, the default
385     priority is used.
386
387     @rtype: int
388
389     """
390     priorities = [op.priority for op in self.ops
391                   if op.status not in constants.OPS_FINALIZED]
392
393     if not priorities:
394       # All opcodes are done, assume default priority
395       return constants.OP_PRIO_DEFAULT
396
397     return min(priorities)
398
399   def GetLogEntries(self, newer_than):
400     """Selectively returns the log entries.
401
402     @type newer_than: None or int
403     @param newer_than: if this is None, return all log entries,
404         otherwise return only the log entries with serial higher
405         than this value
406     @rtype: list
407     @return: the list of the log entries selected
408
409     """
410     if newer_than is None:
411       serial = -1
412     else:
413       serial = newer_than
414
415     entries = []
416     for op in self.ops:
417       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
418
419     return entries
420
421   def GetInfo(self, fields):
422     """Returns information about a job.
423
424     @type fields: list
425     @param fields: names of fields to return
426     @rtype: list
427     @return: list with one element for each field
428     @raise errors.OpExecError: when an invalid field
429         has been passed
430
431     """
432     return _SimpleJobQuery(fields)(self)
433
434   def MarkUnfinishedOps(self, status, result):
435     """Mark unfinished opcodes with a given status and result.
436
437     This is an utility function for marking all running or waiting to
438     be run opcodes with a given status. Opcodes which are already
439     finalised are not changed.
440
441     @param status: a given opcode status
442     @param result: the opcode result
443
444     """
445     not_marked = True
446     for op in self.ops:
447       if op.status in constants.OPS_FINALIZED:
448         assert not_marked, "Finalized opcodes found after non-finalized ones"
449         continue
450       op.status = status
451       op.result = result
452       not_marked = False
453
454   def Finalize(self):
455     """Marks the job as finalized.
456
457     """
458     self.end_timestamp = TimeStampNow()
459
460   def Cancel(self):
461     """Marks job as canceled/-ing if possible.
462
463     @rtype: tuple; (bool, string)
464     @return: Boolean describing whether job was successfully canceled or marked
465       as canceling and a text message
466
467     """
468     status = self.CalcStatus()
469
470     if status == constants.JOB_STATUS_QUEUED:
471       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
472                              "Job canceled by request")
473       self.Finalize()
474       return (True, "Job %s canceled" % self.id)
475
476     elif status == constants.JOB_STATUS_WAITING:
477       # The worker will notice the new status and cancel the job
478       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
479       return (True, "Job %s will be canceled" % self.id)
480
481     else:
482       logging.debug("Job %s is no longer waiting in the queue", self.id)
483       return (False, "Job %s is no longer waiting in the queue" % self.id)
484
485   def ChangePriority(self, priority):
486     """Changes the job priority.
487
488     @type priority: int
489     @param priority: New priority
490     @rtype: tuple; (bool, string)
491     @return: Boolean describing whether job's priority was successfully changed
492       and a text message
493
494     """
495     status = self.CalcStatus()
496
497     if status in constants.JOBS_FINALIZED:
498       return (False, "Job %s is finished" % self.id)
499     elif status == constants.JOB_STATUS_CANCELING:
500       return (False, "Job %s is cancelling" % self.id)
501     else:
502       assert status in (constants.JOB_STATUS_QUEUED,
503                         constants.JOB_STATUS_WAITING,
504                         constants.JOB_STATUS_RUNNING)
505
506       changed = False
507       for op in self.ops:
508         if (op.status == constants.OP_STATUS_RUNNING or
509             op.status in constants.OPS_FINALIZED):
510           assert not changed, \
511             ("Found opcode for which priority should not be changed after"
512              " priority has been changed for previous opcodes")
513           continue
514
515         assert op.status in (constants.OP_STATUS_QUEUED,
516                              constants.OP_STATUS_WAITING)
517
518         changed = True
519
520         # Set new priority (doesn't modify opcode input)
521         op.priority = priority
522
523       if changed:
524         return (True, ("Priorities of pending opcodes for job %s have been"
525                        " changed to %s" % (self.id, priority)))
526       else:
527         return (False, "Job %s had no pending opcodes" % self.id)
528
529
530 class _OpExecCallbacks(mcpu.OpExecCbBase):
531   def __init__(self, queue, job, op):
532     """Initializes this class.
533
534     @type queue: L{JobQueue}
535     @param queue: Job queue
536     @type job: L{_QueuedJob}
537     @param job: Job object
538     @type op: L{_QueuedOpCode}
539     @param op: OpCode
540
541     """
542     assert queue, "Queue is missing"
543     assert job, "Job is missing"
544     assert op, "Opcode is missing"
545
546     self._queue = queue
547     self._job = job
548     self._op = op
549
550   def _CheckCancel(self):
551     """Raises an exception to cancel the job if asked to.
552
553     """
554     # Cancel here if we were asked to
555     if self._op.status == constants.OP_STATUS_CANCELING:
556       logging.debug("Canceling opcode")
557       raise CancelJob()
558
559     # See if queue is shutting down
560     if not self._queue.AcceptingJobsUnlocked():
561       logging.debug("Queue is shutting down")
562       raise QueueShutdown()
563
564   @locking.ssynchronized(_QUEUE, shared=1)
565   def NotifyStart(self):
566     """Mark the opcode as running, not lock-waiting.
567
568     This is called from the mcpu code as a notifier function, when the LU is
569     finally about to start the Exec() method. Of course, to have end-user
570     visible results, the opcode must be initially (before calling into
571     Processor.ExecOpCode) set to OP_STATUS_WAITING.
572
573     """
574     assert self._op in self._job.ops
575     assert self._op.status in (constants.OP_STATUS_WAITING,
576                                constants.OP_STATUS_CANCELING)
577
578     # Cancel here if we were asked to
579     self._CheckCancel()
580
581     logging.debug("Opcode is now running")
582
583     self._op.status = constants.OP_STATUS_RUNNING
584     self._op.exec_timestamp = TimeStampNow()
585
586     # And finally replicate the job status
587     self._queue.UpdateJobUnlocked(self._job)
588
589   @locking.ssynchronized(_QUEUE, shared=1)
590   def _AppendFeedback(self, timestamp, log_type, log_msg):
591     """Internal feedback append function, with locks
592
593     """
594     self._job.log_serial += 1
595     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
596     self._queue.UpdateJobUnlocked(self._job, replicate=False)
597
598   def Feedback(self, *args):
599     """Append a log entry.
600
601     """
602     assert len(args) < 3
603
604     if len(args) == 1:
605       log_type = constants.ELOG_MESSAGE
606       log_msg = args[0]
607     else:
608       (log_type, log_msg) = args
609
610     # The time is split to make serialization easier and not lose
611     # precision.
612     timestamp = utils.SplitTime(time.time())
613     self._AppendFeedback(timestamp, log_type, log_msg)
614
615   def CurrentPriority(self):
616     """Returns current priority for opcode.
617
618     """
619     assert self._op.status in (constants.OP_STATUS_WAITING,
620                                constants.OP_STATUS_CANCELING)
621
622     # Cancel here if we were asked to
623     self._CheckCancel()
624
625     return self._op.priority
626
627   def SubmitManyJobs(self, jobs):
628     """Submits jobs for processing.
629
630     See L{JobQueue.SubmitManyJobs}.
631
632     """
633     # Locking is done in job queue
634     return self._queue.SubmitManyJobs(jobs)
635
636
637 class _JobChangesChecker(object):
638   def __init__(self, fields, prev_job_info, prev_log_serial):
639     """Initializes this class.
640
641     @type fields: list of strings
642     @param fields: Fields requested by LUXI client
643     @type prev_job_info: string
644     @param prev_job_info: previous job info, as passed by the LUXI client
645     @type prev_log_serial: string
646     @param prev_log_serial: previous job serial, as passed by the LUXI client
647
648     """
649     self._squery = _SimpleJobQuery(fields)
650     self._prev_job_info = prev_job_info
651     self._prev_log_serial = prev_log_serial
652
653   def __call__(self, job):
654     """Checks whether job has changed.
655
656     @type job: L{_QueuedJob}
657     @param job: Job object
658
659     """
660     assert not job.writable, "Expected read-only job"
661
662     status = job.CalcStatus()
663     job_info = self._squery(job)
664     log_entries = job.GetLogEntries(self._prev_log_serial)
665
666     # Serializing and deserializing data can cause type changes (e.g. from
667     # tuple to list) or precision loss. We're doing it here so that we get
668     # the same modifications as the data received from the client. Without
669     # this, the comparison afterwards might fail without the data being
670     # significantly different.
671     # TODO: we just deserialized from disk, investigate how to make sure that
672     # the job info and log entries are compatible to avoid this further step.
673     # TODO: Doing something like in testutils.py:UnifyValueType might be more
674     # efficient, though floats will be tricky
675     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
676     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
677
678     # Don't even try to wait if the job is no longer running, there will be
679     # no changes.
680     if (status not in (constants.JOB_STATUS_QUEUED,
681                        constants.JOB_STATUS_RUNNING,
682                        constants.JOB_STATUS_WAITING) or
683         job_info != self._prev_job_info or
684         (log_entries and self._prev_log_serial != log_entries[0][0])):
685       logging.debug("Job %s changed", job.id)
686       return (job_info, log_entries)
687
688     return None
689
690
691 class _JobFileChangesWaiter(object):
692   def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
693     """Initializes this class.
694
695     @type filename: string
696     @param filename: Path to job file
697     @raises errors.InotifyError: if the notifier cannot be setup
698
699     """
700     self._wm = _inotify_wm_cls()
701     self._inotify_handler = \
702       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
703     self._notifier = \
704       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
705     try:
706       self._inotify_handler.enable()
707     except Exception:
708       # pyinotify doesn't close file descriptors automatically
709       self._notifier.stop()
710       raise
711
712   def _OnInotify(self, notifier_enabled):
713     """Callback for inotify.
714
715     """
716     if not notifier_enabled:
717       self._inotify_handler.enable()
718
719   def Wait(self, timeout):
720     """Waits for the job file to change.
721
722     @type timeout: float
723     @param timeout: Timeout in seconds
724     @return: Whether there have been events
725
726     """
727     assert timeout >= 0
728     have_events = self._notifier.check_events(timeout * 1000)
729     if have_events:
730       self._notifier.read_events()
731     self._notifier.process_events()
732     return have_events
733
734   def Close(self):
735     """Closes underlying notifier and its file descriptor.
736
737     """
738     self._notifier.stop()
739
740
741 class _JobChangesWaiter(object):
742   def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
743     """Initializes this class.
744
745     @type filename: string
746     @param filename: Path to job file
747
748     """
749     self._filewaiter = None
750     self._filename = filename
751     self._waiter_cls = _waiter_cls
752
753   def Wait(self, timeout):
754     """Waits for a job to change.
755
756     @type timeout: float
757     @param timeout: Timeout in seconds
758     @return: Whether there have been events
759
760     """
761     if self._filewaiter:
762       return self._filewaiter.Wait(timeout)
763
764     # Lazy setup: Avoid inotify setup cost when job file has already changed.
765     # If this point is reached, return immediately and let caller check the job
766     # file again in case there were changes since the last check. This avoids a
767     # race condition.
768     self._filewaiter = self._waiter_cls(self._filename)
769
770     return True
771
772   def Close(self):
773     """Closes underlying waiter.
774
775     """
776     if self._filewaiter:
777       self._filewaiter.Close()
778
779
780 class _WaitForJobChangesHelper(object):
781   """Helper class using inotify to wait for changes in a job file.
782
783   This class takes a previous job status and serial, and alerts the client when
784   the current job status has changed.
785
786   """
787   @staticmethod
788   def _CheckForChanges(counter, job_load_fn, check_fn):
789     if counter.next() > 0:
790       # If this isn't the first check the job is given some more time to change
791       # again. This gives better performance for jobs generating many
792       # changes/messages.
793       time.sleep(0.1)
794
795     job = job_load_fn()
796     if not job:
797       raise errors.JobLost()
798
799     result = check_fn(job)
800     if result is None:
801       raise utils.RetryAgain()
802
803     return result
804
805   def __call__(self, filename, job_load_fn,
806                fields, prev_job_info, prev_log_serial, timeout,
807                _waiter_cls=_JobChangesWaiter):
808     """Waits for changes on a job.
809
810     @type filename: string
811     @param filename: File on which to wait for changes
812     @type job_load_fn: callable
813     @param job_load_fn: Function to load job
814     @type fields: list of strings
815     @param fields: Which fields to check for changes
816     @type prev_job_info: list or None
817     @param prev_job_info: Last job information returned
818     @type prev_log_serial: int
819     @param prev_log_serial: Last job message serial number
820     @type timeout: float
821     @param timeout: maximum time to wait in seconds
822
823     """
824     counter = itertools.count()
825     try:
826       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
827       waiter = _waiter_cls(filename)
828       try:
829         return utils.Retry(compat.partial(self._CheckForChanges,
830                                           counter, job_load_fn, check_fn),
831                            utils.RETRY_REMAINING_TIME, timeout,
832                            wait_fn=waiter.Wait)
833       finally:
834         waiter.Close()
835     except errors.JobLost:
836       return None
837     except utils.RetryTimeout:
838       return constants.JOB_NOTCHANGED
839
840
841 def _EncodeOpError(err):
842   """Encodes an error which occurred while processing an opcode.
843
844   """
845   if isinstance(err, errors.GenericError):
846     to_encode = err
847   else:
848     to_encode = errors.OpExecError(str(err))
849
850   return errors.EncodeException(to_encode)
851
852
853 class _TimeoutStrategyWrapper:
854   def __init__(self, fn):
855     """Initializes this class.
856
857     """
858     self._fn = fn
859     self._next = None
860
861   def _Advance(self):
862     """Gets the next timeout if necessary.
863
864     """
865     if self._next is None:
866       self._next = self._fn()
867
868   def Peek(self):
869     """Returns the next timeout.
870
871     """
872     self._Advance()
873     return self._next
874
875   def Next(self):
876     """Returns the current timeout and advances the internal state.
877
878     """
879     self._Advance()
880     result = self._next
881     self._next = None
882     return result
883
884
885 class _OpExecContext:
886   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
887     """Initializes this class.
888
889     """
890     self.op = op
891     self.index = index
892     self.log_prefix = log_prefix
893     self.summary = op.input.Summary()
894
895     # Create local copy to modify
896     if getattr(op.input, opcodes.DEPEND_ATTR, None):
897       self.jobdeps = op.input.depends[:]
898     else:
899       self.jobdeps = None
900
901     self._timeout_strategy_factory = timeout_strategy_factory
902     self._ResetTimeoutStrategy()
903
904   def _ResetTimeoutStrategy(self):
905     """Creates a new timeout strategy.
906
907     """
908     self._timeout_strategy = \
909       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
910
911   def CheckPriorityIncrease(self):
912     """Checks whether priority can and should be increased.
913
914     Called when locks couldn't be acquired.
915
916     """
917     op = self.op
918
919     # Exhausted all retries and next round should not use blocking acquire
920     # for locks?
921     if (self._timeout_strategy.Peek() is None and
922         op.priority > constants.OP_PRIO_HIGHEST):
923       logging.debug("Increasing priority")
924       op.priority -= 1
925       self._ResetTimeoutStrategy()
926       return True
927
928     return False
929
930   def GetNextLockTimeout(self):
931     """Returns the next lock acquire timeout.
932
933     """
934     return self._timeout_strategy.Next()
935
936
937 class _JobProcessor(object):
938   (DEFER,
939    WAITDEP,
940    FINISHED) = range(1, 4)
941
942   def __init__(self, queue, opexec_fn, job,
943                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
944     """Initializes this class.
945
946     """
947     self.queue = queue
948     self.opexec_fn = opexec_fn
949     self.job = job
950     self._timeout_strategy_factory = _timeout_strategy_factory
951
952   @staticmethod
953   def _FindNextOpcode(job, timeout_strategy_factory):
954     """Locates the next opcode to run.
955
956     @type job: L{_QueuedJob}
957     @param job: Job object
958     @param timeout_strategy_factory: Callable to create new timeout strategy
959
960     """
961     # Create some sort of a cache to speed up locating next opcode for future
962     # lookups
963     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
964     # pending and one for processed ops.
965     if job.ops_iter is None:
966       job.ops_iter = enumerate(job.ops)
967
968     # Find next opcode to run
969     while True:
970       try:
971         (idx, op) = job.ops_iter.next()
972       except StopIteration:
973         raise errors.ProgrammerError("Called for a finished job")
974
975       if op.status == constants.OP_STATUS_RUNNING:
976         # Found an opcode already marked as running
977         raise errors.ProgrammerError("Called for job marked as running")
978
979       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
980                              timeout_strategy_factory)
981
982       if op.status not in constants.OPS_FINALIZED:
983         return opctx
984
985       # This is a job that was partially completed before master daemon
986       # shutdown, so it can be expected that some opcodes are already
987       # completed successfully (if any did error out, then the whole job
988       # should have been aborted and not resubmitted for processing).
989       logging.info("%s: opcode %s already processed, skipping",
990                    opctx.log_prefix, opctx.summary)
991
992   @staticmethod
993   def _MarkWaitlock(job, op):
994     """Marks an opcode as waiting for locks.
995
996     The job's start timestamp is also set if necessary.
997
998     @type job: L{_QueuedJob}
999     @param job: Job object
1000     @type op: L{_QueuedOpCode}
1001     @param op: Opcode object
1002
1003     """
1004     assert op in job.ops
1005     assert op.status in (constants.OP_STATUS_QUEUED,
1006                          constants.OP_STATUS_WAITING)
1007
1008     update = False
1009
1010     op.result = None
1011
1012     if op.status == constants.OP_STATUS_QUEUED:
1013       op.status = constants.OP_STATUS_WAITING
1014       update = True
1015
1016     if op.start_timestamp is None:
1017       op.start_timestamp = TimeStampNow()
1018       update = True
1019
1020     if job.start_timestamp is None:
1021       job.start_timestamp = op.start_timestamp
1022       update = True
1023
1024     assert op.status == constants.OP_STATUS_WAITING
1025
1026     return update
1027
1028   @staticmethod
1029   def _CheckDependencies(queue, job, opctx):
1030     """Checks if an opcode has dependencies and if so, processes them.
1031
1032     @type queue: L{JobQueue}
1033     @param queue: Queue object
1034     @type job: L{_QueuedJob}
1035     @param job: Job object
1036     @type opctx: L{_OpExecContext}
1037     @param opctx: Opcode execution context
1038     @rtype: bool
1039     @return: Whether opcode will be re-scheduled by dependency tracker
1040
1041     """
1042     op = opctx.op
1043
1044     result = False
1045
1046     while opctx.jobdeps:
1047       (dep_job_id, dep_status) = opctx.jobdeps[0]
1048
1049       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1050                                                           dep_status)
1051       assert ht.TNonEmptyString(depmsg), "No dependency message"
1052
1053       logging.info("%s: %s", opctx.log_prefix, depmsg)
1054
1055       if depresult == _JobDependencyManager.CONTINUE:
1056         # Remove dependency and continue
1057         opctx.jobdeps.pop(0)
1058
1059       elif depresult == _JobDependencyManager.WAIT:
1060         # Need to wait for notification, dependency tracker will re-add job
1061         # to workerpool
1062         result = True
1063         break
1064
1065       elif depresult == _JobDependencyManager.CANCEL:
1066         # Job was cancelled, cancel this job as well
1067         job.Cancel()
1068         assert op.status == constants.OP_STATUS_CANCELING
1069         break
1070
1071       elif depresult in (_JobDependencyManager.WRONGSTATUS,
1072                          _JobDependencyManager.ERROR):
1073         # Job failed or there was an error, this job must fail
1074         op.status = constants.OP_STATUS_ERROR
1075         op.result = _EncodeOpError(errors.OpExecError(depmsg))
1076         break
1077
1078       else:
1079         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1080                                      depresult)
1081
1082     return result
1083
1084   def _ExecOpCodeUnlocked(self, opctx):
1085     """Processes one opcode and returns the result.
1086
1087     """
1088     op = opctx.op
1089
1090     assert op.status == constants.OP_STATUS_WAITING
1091
1092     timeout = opctx.GetNextLockTimeout()
1093
1094     try:
1095       # Make sure not to hold queue lock while calling ExecOpCode
1096       result = self.opexec_fn(op.input,
1097                               _OpExecCallbacks(self.queue, self.job, op),
1098                               timeout=timeout)
1099     except mcpu.LockAcquireTimeout:
1100       assert timeout is not None, "Received timeout for blocking acquire"
1101       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1102
1103       assert op.status in (constants.OP_STATUS_WAITING,
1104                            constants.OP_STATUS_CANCELING)
1105
1106       # Was job cancelled while we were waiting for the lock?
1107       if op.status == constants.OP_STATUS_CANCELING:
1108         return (constants.OP_STATUS_CANCELING, None)
1109
1110       # Queue is shutting down, return to queued
1111       if not self.queue.AcceptingJobsUnlocked():
1112         return (constants.OP_STATUS_QUEUED, None)
1113
1114       # Stay in waitlock while trying to re-acquire lock
1115       return (constants.OP_STATUS_WAITING, None)
1116     except CancelJob:
1117       logging.exception("%s: Canceling job", opctx.log_prefix)
1118       assert op.status == constants.OP_STATUS_CANCELING
1119       return (constants.OP_STATUS_CANCELING, None)
1120
1121     except QueueShutdown:
1122       logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1123
1124       assert op.status == constants.OP_STATUS_WAITING
1125
1126       # Job hadn't been started yet, so it should return to the queue
1127       return (constants.OP_STATUS_QUEUED, None)
1128
1129     except Exception, err: # pylint: disable=W0703
1130       logging.exception("%s: Caught exception in %s",
1131                         opctx.log_prefix, opctx.summary)
1132       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1133     else:
1134       logging.debug("%s: %s successful",
1135                     opctx.log_prefix, opctx.summary)
1136       return (constants.OP_STATUS_SUCCESS, result)
1137
1138   def __call__(self, _nextop_fn=None):
1139     """Continues execution of a job.
1140
1141     @param _nextop_fn: Callback function for tests
1142     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1143       be deferred and C{WAITDEP} if the dependency manager
1144       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1145
1146     """
1147     queue = self.queue
1148     job = self.job
1149
1150     logging.debug("Processing job %s", job.id)
1151
1152     queue.acquire(shared=1)
1153     try:
1154       opcount = len(job.ops)
1155
1156       assert job.writable, "Expected writable job"
1157
1158       # Don't do anything for finalized jobs
1159       if job.CalcStatus() in constants.JOBS_FINALIZED:
1160         return self.FINISHED
1161
1162       # Is a previous opcode still pending?
1163       if job.cur_opctx:
1164         opctx = job.cur_opctx
1165         job.cur_opctx = None
1166       else:
1167         if __debug__ and _nextop_fn:
1168           _nextop_fn()
1169         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1170
1171       op = opctx.op
1172
1173       # Consistency check
1174       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1175                                      constants.OP_STATUS_CANCELING)
1176                         for i in job.ops[opctx.index + 1:])
1177
1178       assert op.status in (constants.OP_STATUS_QUEUED,
1179                            constants.OP_STATUS_WAITING,
1180                            constants.OP_STATUS_CANCELING)
1181
1182       assert (op.priority <= constants.OP_PRIO_LOWEST and
1183               op.priority >= constants.OP_PRIO_HIGHEST)
1184
1185       waitjob = None
1186
1187       if op.status != constants.OP_STATUS_CANCELING:
1188         assert op.status in (constants.OP_STATUS_QUEUED,
1189                              constants.OP_STATUS_WAITING)
1190
1191         # Prepare to start opcode
1192         if self._MarkWaitlock(job, op):
1193           # Write to disk
1194           queue.UpdateJobUnlocked(job)
1195
1196         assert op.status == constants.OP_STATUS_WAITING
1197         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1198         assert job.start_timestamp and op.start_timestamp
1199         assert waitjob is None
1200
1201         # Check if waiting for a job is necessary
1202         waitjob = self._CheckDependencies(queue, job, opctx)
1203
1204         assert op.status in (constants.OP_STATUS_WAITING,
1205                              constants.OP_STATUS_CANCELING,
1206                              constants.OP_STATUS_ERROR)
1207
1208         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1209                                          constants.OP_STATUS_ERROR)):
1210           logging.info("%s: opcode %s waiting for locks",
1211                        opctx.log_prefix, opctx.summary)
1212
1213           assert not opctx.jobdeps, "Not all dependencies were removed"
1214
1215           queue.release()
1216           try:
1217             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1218           finally:
1219             queue.acquire(shared=1)
1220
1221           op.status = op_status
1222           op.result = op_result
1223
1224           assert not waitjob
1225
1226         if op.status in (constants.OP_STATUS_WAITING,
1227                          constants.OP_STATUS_QUEUED):
1228           # waiting: Couldn't get locks in time
1229           # queued: Queue is shutting down
1230           assert not op.end_timestamp
1231         else:
1232           # Finalize opcode
1233           op.end_timestamp = TimeStampNow()
1234
1235           if op.status == constants.OP_STATUS_CANCELING:
1236             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1237                                   for i in job.ops[opctx.index:])
1238           else:
1239             assert op.status in constants.OPS_FINALIZED
1240
1241       if op.status == constants.OP_STATUS_QUEUED:
1242         # Queue is shutting down
1243         assert not waitjob
1244
1245         finalize = False
1246
1247         # Reset context
1248         job.cur_opctx = None
1249
1250         # In no case must the status be finalized here
1251         assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1252
1253       elif op.status == constants.OP_STATUS_WAITING or waitjob:
1254         finalize = False
1255
1256         if not waitjob and opctx.CheckPriorityIncrease():
1257           # Priority was changed, need to update on-disk file
1258           queue.UpdateJobUnlocked(job)
1259
1260         # Keep around for another round
1261         job.cur_opctx = opctx
1262
1263         assert (op.priority <= constants.OP_PRIO_LOWEST and
1264                 op.priority >= constants.OP_PRIO_HIGHEST)
1265
1266         # In no case must the status be finalized here
1267         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1268
1269       else:
1270         # Ensure all opcodes so far have been successful
1271         assert (opctx.index == 0 or
1272                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1273                            for i in job.ops[:opctx.index]))
1274
1275         # Reset context
1276         job.cur_opctx = None
1277
1278         if op.status == constants.OP_STATUS_SUCCESS:
1279           finalize = False
1280
1281         elif op.status == constants.OP_STATUS_ERROR:
1282           # Ensure failed opcode has an exception as its result
1283           assert errors.GetEncodedError(job.ops[opctx.index].result)
1284
1285           to_encode = errors.OpExecError("Preceding opcode failed")
1286           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1287                                 _EncodeOpError(to_encode))
1288           finalize = True
1289
1290           # Consistency check
1291           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1292                             errors.GetEncodedError(i.result)
1293                             for i in job.ops[opctx.index:])
1294
1295         elif op.status == constants.OP_STATUS_CANCELING:
1296           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1297                                 "Job canceled by request")
1298           finalize = True
1299
1300         else:
1301           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1302
1303         if opctx.index == (opcount - 1):
1304           # Finalize on last opcode
1305           finalize = True
1306
1307         if finalize:
1308           # All opcodes have been run, finalize job
1309           job.Finalize()
1310
1311         # Write to disk. If the job status is final, this is the final write
1312         # allowed. Once the file has been written, it can be archived anytime.
1313         queue.UpdateJobUnlocked(job)
1314
1315         assert not waitjob
1316
1317         if finalize:
1318           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1319           return self.FINISHED
1320
1321       assert not waitjob or queue.depmgr.JobWaiting(job)
1322
1323       if waitjob:
1324         return self.WAITDEP
1325       else:
1326         return self.DEFER
1327     finally:
1328       assert job.writable, "Job became read-only while being processed"
1329       queue.release()
1330
1331
1332 def _EvaluateJobProcessorResult(depmgr, job, result):
1333   """Looks at a result from L{_JobProcessor} for a job.
1334
1335   To be used in a L{_JobQueueWorker}.
1336
1337   """
1338   if result == _JobProcessor.FINISHED:
1339     # Notify waiting jobs
1340     depmgr.NotifyWaiters(job.id)
1341
1342   elif result == _JobProcessor.DEFER:
1343     # Schedule again
1344     raise workerpool.DeferTask(priority=job.CalcPriority())
1345
1346   elif result == _JobProcessor.WAITDEP:
1347     # No-op, dependency manager will re-schedule
1348     pass
1349
1350   else:
1351     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1352                                  (result, ))
1353
1354
1355 class _JobQueueWorker(workerpool.BaseWorker):
1356   """The actual job workers.
1357
1358   """
1359   def RunTask(self, job): # pylint: disable=W0221
1360     """Job executor.
1361
1362     @type job: L{_QueuedJob}
1363     @param job: the job to be processed
1364
1365     """
1366     assert job.writable, "Expected writable job"
1367
1368     # Ensure only one worker is active on a single job. If a job registers for
1369     # a dependency job, and the other job notifies before the first worker is
1370     # done, the job can end up in the tasklist more than once.
1371     job.processor_lock.acquire()
1372     try:
1373       return self._RunTaskInner(job)
1374     finally:
1375       job.processor_lock.release()
1376
1377   def _RunTaskInner(self, job):
1378     """Executes a job.
1379
1380     Must be called with per-job lock acquired.
1381
1382     """
1383     queue = job.queue
1384     assert queue == self.pool.queue
1385
1386     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1387     setname_fn(None)
1388
1389     proc = mcpu.Processor(queue.context, job.id)
1390
1391     # Create wrapper for setting thread name
1392     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1393                                     proc.ExecOpCode)
1394
1395     _EvaluateJobProcessorResult(queue.depmgr, job,
1396                                 _JobProcessor(queue, wrap_execop_fn, job)())
1397
1398   @staticmethod
1399   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1400     """Updates the worker thread name to include a short summary of the opcode.
1401
1402     @param setname_fn: Callable setting worker thread name
1403     @param execop_fn: Callable for executing opcode (usually
1404                       L{mcpu.Processor.ExecOpCode})
1405
1406     """
1407     setname_fn(op)
1408     try:
1409       return execop_fn(op, *args, **kwargs)
1410     finally:
1411       setname_fn(None)
1412
1413   @staticmethod
1414   def _GetWorkerName(job, op):
1415     """Sets the worker thread name.
1416
1417     @type job: L{_QueuedJob}
1418     @type op: L{opcodes.OpCode}
1419
1420     """
1421     parts = ["Job%s" % job.id]
1422
1423     if op:
1424       parts.append(op.TinySummary())
1425
1426     return "/".join(parts)
1427
1428
1429 class _JobQueueWorkerPool(workerpool.WorkerPool):
1430   """Simple class implementing a job-processing workerpool.
1431
1432   """
1433   def __init__(self, queue):
1434     super(_JobQueueWorkerPool, self).__init__("Jq",
1435                                               JOBQUEUE_THREADS,
1436                                               _JobQueueWorker)
1437     self.queue = queue
1438
1439
1440 class _JobDependencyManager:
1441   """Keeps track of job dependencies.
1442
1443   """
1444   (WAIT,
1445    ERROR,
1446    CANCEL,
1447    CONTINUE,
1448    WRONGSTATUS) = range(1, 6)
1449
1450   def __init__(self, getstatus_fn, enqueue_fn):
1451     """Initializes this class.
1452
1453     """
1454     self._getstatus_fn = getstatus_fn
1455     self._enqueue_fn = enqueue_fn
1456
1457     self._waiters = {}
1458     self._lock = locking.SharedLock("JobDepMgr")
1459
1460   @locking.ssynchronized(_LOCK, shared=1)
1461   def GetLockInfo(self, requested): # pylint: disable=W0613
1462     """Retrieves information about waiting jobs.
1463
1464     @type requested: set
1465     @param requested: Requested information, see C{query.LQ_*}
1466
1467     """
1468     # No need to sort here, that's being done by the lock manager and query
1469     # library. There are no priorities for notifying jobs, hence all show up as
1470     # one item under "pending".
1471     return [("job/%s" % job_id, None, None,
1472              [("job", [job.id for job in waiters])])
1473             for job_id, waiters in self._waiters.items()
1474             if waiters]
1475
1476   @locking.ssynchronized(_LOCK, shared=1)
1477   def JobWaiting(self, job):
1478     """Checks if a job is waiting.
1479
1480     """
1481     return compat.any(job in jobs
1482                       for jobs in self._waiters.values())
1483
1484   @locking.ssynchronized(_LOCK)
1485   def CheckAndRegister(self, job, dep_job_id, dep_status):
1486     """Checks if a dependency job has the requested status.
1487
1488     If the other job is not yet in a finalized status, the calling job will be
1489     notified (re-added to the workerpool) at a later point.
1490
1491     @type job: L{_QueuedJob}
1492     @param job: Job object
1493     @type dep_job_id: int
1494     @param dep_job_id: ID of dependency job
1495     @type dep_status: list
1496     @param dep_status: Required status
1497
1498     """
1499     assert ht.TJobId(job.id)
1500     assert ht.TJobId(dep_job_id)
1501     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1502
1503     if job.id == dep_job_id:
1504       return (self.ERROR, "Job can't depend on itself")
1505
1506     # Get status of dependency job
1507     try:
1508       status = self._getstatus_fn(dep_job_id)
1509     except errors.JobLost, err:
1510       return (self.ERROR, "Dependency error: %s" % err)
1511
1512     assert status in constants.JOB_STATUS_ALL
1513
1514     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1515
1516     if status not in constants.JOBS_FINALIZED:
1517       # Register for notification and wait for job to finish
1518       job_id_waiters.add(job)
1519       return (self.WAIT,
1520               "Need to wait for job %s, wanted status '%s'" %
1521               (dep_job_id, dep_status))
1522
1523     # Remove from waiters list
1524     if job in job_id_waiters:
1525       job_id_waiters.remove(job)
1526
1527     if (status == constants.JOB_STATUS_CANCELED and
1528         constants.JOB_STATUS_CANCELED not in dep_status):
1529       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1530
1531     elif not dep_status or status in dep_status:
1532       return (self.CONTINUE,
1533               "Dependency job %s finished with status '%s'" %
1534               (dep_job_id, status))
1535
1536     else:
1537       return (self.WRONGSTATUS,
1538               "Dependency job %s finished with status '%s',"
1539               " not one of '%s' as required" %
1540               (dep_job_id, status, utils.CommaJoin(dep_status)))
1541
1542   def _RemoveEmptyWaitersUnlocked(self):
1543     """Remove all jobs without actual waiters.
1544
1545     """
1546     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1547                    if not waiters]:
1548       del self._waiters[job_id]
1549
1550   def NotifyWaiters(self, job_id):
1551     """Notifies all jobs waiting for a certain job ID.
1552
1553     @attention: Do not call until L{CheckAndRegister} returned a status other
1554       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1555     @type job_id: int
1556     @param job_id: Job ID
1557
1558     """
1559     assert ht.TJobId(job_id)
1560
1561     self._lock.acquire()
1562     try:
1563       self._RemoveEmptyWaitersUnlocked()
1564
1565       jobs = self._waiters.pop(job_id, None)
1566     finally:
1567       self._lock.release()
1568
1569     if jobs:
1570       # Re-add jobs to workerpool
1571       logging.debug("Re-adding %s jobs which were waiting for job %s",
1572                     len(jobs), job_id)
1573       self._enqueue_fn(jobs)
1574
1575
1576 def _RequireOpenQueue(fn):
1577   """Decorator for "public" functions.
1578
1579   This function should be used for all 'public' functions. That is,
1580   functions usually called from other classes. Note that this should
1581   be applied only to methods (not plain functions), since it expects
1582   that the decorated function is called with a first argument that has
1583   a '_queue_filelock' argument.
1584
1585   @warning: Use this decorator only after locking.ssynchronized
1586
1587   Example::
1588     @locking.ssynchronized(_LOCK)
1589     @_RequireOpenQueue
1590     def Example(self):
1591       pass
1592
1593   """
1594   def wrapper(self, *args, **kwargs):
1595     # pylint: disable=W0212
1596     assert self._queue_filelock is not None, "Queue should be open"
1597     return fn(self, *args, **kwargs)
1598   return wrapper
1599
1600
1601 def _RequireNonDrainedQueue(fn):
1602   """Decorator checking for a non-drained queue.
1603
1604   To be used with functions submitting new jobs.
1605
1606   """
1607   def wrapper(self, *args, **kwargs):
1608     """Wrapper function.
1609
1610     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1611
1612     """
1613     # Ok when sharing the big job queue lock, as the drain file is created when
1614     # the lock is exclusive.
1615     # Needs access to protected member, pylint: disable=W0212
1616     if self._drained:
1617       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1618
1619     if not self._accepting_jobs:
1620       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1621
1622     return fn(self, *args, **kwargs)
1623   return wrapper
1624
1625
1626 class JobQueue(object):
1627   """Queue used to manage the jobs.
1628
1629   """
1630   def __init__(self, context):
1631     """Constructor for JobQueue.
1632
1633     The constructor will initialize the job queue object and then
1634     start loading the current jobs from disk, either for starting them
1635     (if they were queue) or for aborting them (if they were already
1636     running).
1637
1638     @type context: GanetiContext
1639     @param context: the context object for access to the configuration
1640         data and other ganeti objects
1641
1642     """
1643     self.context = context
1644     self._memcache = weakref.WeakValueDictionary()
1645     self._my_hostname = netutils.Hostname.GetSysName()
1646
1647     # The Big JobQueue lock. If a code block or method acquires it in shared
1648     # mode safe it must guarantee concurrency with all the code acquiring it in
1649     # shared mode, including itself. In order not to acquire it at all
1650     # concurrency must be guaranteed with all code acquiring it in shared mode
1651     # and all code acquiring it exclusively.
1652     self._lock = locking.SharedLock("JobQueue")
1653
1654     self.acquire = self._lock.acquire
1655     self.release = self._lock.release
1656
1657     # Accept jobs by default
1658     self._accepting_jobs = True
1659
1660     # Initialize the queue, and acquire the filelock.
1661     # This ensures no other process is working on the job queue.
1662     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1663
1664     # Read serial file
1665     self._last_serial = jstore.ReadSerial()
1666     assert self._last_serial is not None, ("Serial file was modified between"
1667                                            " check in jstore and here")
1668
1669     # Get initial list of nodes
1670     self._nodes = dict((n.name, n.primary_ip)
1671                        for n in self.context.cfg.GetAllNodesInfo().values()
1672                        if n.master_candidate)
1673
1674     # Remove master node
1675     self._nodes.pop(self._my_hostname, None)
1676
1677     # TODO: Check consistency across nodes
1678
1679     self._queue_size = None
1680     self._UpdateQueueSizeUnlocked()
1681     assert ht.TInt(self._queue_size)
1682     self._drained = jstore.CheckDrainFlag()
1683
1684     # Job dependencies
1685     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1686                                         self._EnqueueJobs)
1687     self.context.glm.AddToLockMonitor(self.depmgr)
1688
1689     # Setup worker pool
1690     self._wpool = _JobQueueWorkerPool(self)
1691     try:
1692       self._InspectQueue()
1693     except:
1694       self._wpool.TerminateWorkers()
1695       raise
1696
1697   @locking.ssynchronized(_LOCK)
1698   @_RequireOpenQueue
1699   def _InspectQueue(self):
1700     """Loads the whole job queue and resumes unfinished jobs.
1701
1702     This function needs the lock here because WorkerPool.AddTask() may start a
1703     job while we're still doing our work.
1704
1705     """
1706     logging.info("Inspecting job queue")
1707
1708     restartjobs = []
1709
1710     all_job_ids = self._GetJobIDsUnlocked()
1711     jobs_count = len(all_job_ids)
1712     lastinfo = time.time()
1713     for idx, job_id in enumerate(all_job_ids):
1714       # Give an update every 1000 jobs or 10 seconds
1715       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1716           idx == (jobs_count - 1)):
1717         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1718                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1719         lastinfo = time.time()
1720
1721       job = self._LoadJobUnlocked(job_id)
1722
1723       # a failure in loading the job can cause 'None' to be returned
1724       if job is None:
1725         continue
1726
1727       status = job.CalcStatus()
1728
1729       if status == constants.JOB_STATUS_QUEUED:
1730         restartjobs.append(job)
1731
1732       elif status in (constants.JOB_STATUS_RUNNING,
1733                       constants.JOB_STATUS_WAITING,
1734                       constants.JOB_STATUS_CANCELING):
1735         logging.warning("Unfinished job %s found: %s", job.id, job)
1736
1737         if status == constants.JOB_STATUS_WAITING:
1738           # Restart job
1739           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1740           restartjobs.append(job)
1741         else:
1742           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1743                                 "Unclean master daemon shutdown")
1744           job.Finalize()
1745
1746         self.UpdateJobUnlocked(job)
1747
1748     if restartjobs:
1749       logging.info("Restarting %s jobs", len(restartjobs))
1750       self._EnqueueJobsUnlocked(restartjobs)
1751
1752     logging.info("Job queue inspection finished")
1753
1754   def _GetRpc(self, address_list):
1755     """Gets RPC runner with context.
1756
1757     """
1758     return rpc.JobQueueRunner(self.context, address_list)
1759
1760   @locking.ssynchronized(_LOCK)
1761   @_RequireOpenQueue
1762   def AddNode(self, node):
1763     """Register a new node with the queue.
1764
1765     @type node: L{objects.Node}
1766     @param node: the node object to be added
1767
1768     """
1769     node_name = node.name
1770     assert node_name != self._my_hostname
1771
1772     # Clean queue directory on added node
1773     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1774     msg = result.fail_msg
1775     if msg:
1776       logging.warning("Cannot cleanup queue directory on node %s: %s",
1777                       node_name, msg)
1778
1779     if not node.master_candidate:
1780       # remove if existing, ignoring errors
1781       self._nodes.pop(node_name, None)
1782       # and skip the replication of the job ids
1783       return
1784
1785     # Upload the whole queue excluding archived jobs
1786     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1787
1788     # Upload current serial file
1789     files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1790
1791     # Static address list
1792     addrs = [node.primary_ip]
1793
1794     for file_name in files:
1795       # Read file content
1796       content = utils.ReadFile(file_name)
1797
1798       result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1799                              file_name, content)
1800       msg = result[node_name].fail_msg
1801       if msg:
1802         logging.error("Failed to upload file %s to node %s: %s",
1803                       file_name, node_name, msg)
1804
1805     # Set queue drained flag
1806     result = \
1807       self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1808                                                        self._drained)
1809     msg = result[node_name].fail_msg
1810     if msg:
1811       logging.error("Failed to set queue drained flag on node %s: %s",
1812                     node_name, msg)
1813
1814     self._nodes[node_name] = node.primary_ip
1815
1816   @locking.ssynchronized(_LOCK)
1817   @_RequireOpenQueue
1818   def RemoveNode(self, node_name):
1819     """Callback called when removing nodes from the cluster.
1820
1821     @type node_name: str
1822     @param node_name: the name of the node to remove
1823
1824     """
1825     self._nodes.pop(node_name, None)
1826
1827   @staticmethod
1828   def _CheckRpcResult(result, nodes, failmsg):
1829     """Verifies the status of an RPC call.
1830
1831     Since we aim to keep consistency should this node (the current
1832     master) fail, we will log errors if our rpc fail, and especially
1833     log the case when more than half of the nodes fails.
1834
1835     @param result: the data as returned from the rpc call
1836     @type nodes: list
1837     @param nodes: the list of nodes we made the call to
1838     @type failmsg: str
1839     @param failmsg: the identifier to be used for logging
1840
1841     """
1842     failed = []
1843     success = []
1844
1845     for node in nodes:
1846       msg = result[node].fail_msg
1847       if msg:
1848         failed.append(node)
1849         logging.error("RPC call %s (%s) failed on node %s: %s",
1850                       result[node].call, failmsg, node, msg)
1851       else:
1852         success.append(node)
1853
1854     # +1 for the master node
1855     if (len(success) + 1) < len(failed):
1856       # TODO: Handle failing nodes
1857       logging.error("More than half of the nodes failed")
1858
1859   def _GetNodeIp(self):
1860     """Helper for returning the node name/ip list.
1861
1862     @rtype: (list, list)
1863     @return: a tuple of two lists, the first one with the node
1864         names and the second one with the node addresses
1865
1866     """
1867     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1868     name_list = self._nodes.keys()
1869     addr_list = [self._nodes[name] for name in name_list]
1870     return name_list, addr_list
1871
1872   def _UpdateJobQueueFile(self, file_name, data, replicate):
1873     """Writes a file locally and then replicates it to all nodes.
1874
1875     This function will replace the contents of a file on the local
1876     node and then replicate it to all the other nodes we have.
1877
1878     @type file_name: str
1879     @param file_name: the path of the file to be replicated
1880     @type data: str
1881     @param data: the new contents of the file
1882     @type replicate: boolean
1883     @param replicate: whether to spread the changes to the remote nodes
1884
1885     """
1886     getents = runtime.GetEnts()
1887     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1888                     gid=getents.daemons_gid,
1889                     mode=constants.JOB_QUEUE_FILES_PERMS)
1890
1891     if replicate:
1892       names, addrs = self._GetNodeIp()
1893       result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1894       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1895
1896   def _RenameFilesUnlocked(self, rename):
1897     """Renames a file locally and then replicate the change.
1898
1899     This function will rename a file in the local queue directory
1900     and then replicate this rename to all the other nodes we have.
1901
1902     @type rename: list of (old, new)
1903     @param rename: List containing tuples mapping old to new names
1904
1905     """
1906     # Rename them locally
1907     for old, new in rename:
1908       utils.RenameFile(old, new, mkdir=True)
1909
1910     # ... and on all nodes
1911     names, addrs = self._GetNodeIp()
1912     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1913     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1914
1915   def _NewSerialsUnlocked(self, count):
1916     """Generates a new job identifier.
1917
1918     Job identifiers are unique during the lifetime of a cluster.
1919
1920     @type count: integer
1921     @param count: how many serials to return
1922     @rtype: list of int
1923     @return: a list of job identifiers.
1924
1925     """
1926     assert ht.TNonNegativeInt(count)
1927
1928     # New number
1929     serial = self._last_serial + count
1930
1931     # Write to file
1932     self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1933                              "%s\n" % serial, True)
1934
1935     result = [jstore.FormatJobID(v)
1936               for v in range(self._last_serial + 1, serial + 1)]
1937
1938     # Keep it only if we were able to write the file
1939     self._last_serial = serial
1940
1941     assert len(result) == count
1942
1943     return result
1944
1945   @staticmethod
1946   def _GetJobPath(job_id):
1947     """Returns the job file for a given job id.
1948
1949     @type job_id: str
1950     @param job_id: the job identifier
1951     @rtype: str
1952     @return: the path to the job file
1953
1954     """
1955     return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1956
1957   @staticmethod
1958   def _GetArchivedJobPath(job_id):
1959     """Returns the archived job file for a give job id.
1960
1961     @type job_id: str
1962     @param job_id: the job identifier
1963     @rtype: str
1964     @return: the path to the archived job file
1965
1966     """
1967     return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1968                           jstore.GetArchiveDirectory(job_id),
1969                           "job-%s" % job_id)
1970
1971   @staticmethod
1972   def _DetermineJobDirectories(archived):
1973     """Build list of directories containing job files.
1974
1975     @type archived: bool
1976     @param archived: Whether to include directories for archived jobs
1977     @rtype: list
1978
1979     """
1980     result = [pathutils.QUEUE_DIR]
1981
1982     if archived:
1983       archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1984       result.extend(map(compat.partial(utils.PathJoin, archive_path),
1985                         utils.ListVisibleFiles(archive_path)))
1986
1987     return result
1988
1989   @classmethod
1990   def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1991     """Return all known job IDs.
1992
1993     The method only looks at disk because it's a requirement that all
1994     jobs are present on disk (so in the _memcache we don't have any
1995     extra IDs).
1996
1997     @type sort: boolean
1998     @param sort: perform sorting on the returned job ids
1999     @rtype: list
2000     @return: the list of job IDs
2001
2002     """
2003     jlist = []
2004
2005     for path in cls._DetermineJobDirectories(archived):
2006       for filename in utils.ListVisibleFiles(path):
2007         m = constants.JOB_FILE_RE.match(filename)
2008         if m:
2009           jlist.append(int(m.group(1)))
2010
2011     if sort:
2012       jlist.sort()
2013     return jlist
2014
2015   def _LoadJobUnlocked(self, job_id):
2016     """Loads a job from the disk or memory.
2017
2018     Given a job id, this will return the cached job object if
2019     existing, or try to load the job from the disk. If loading from
2020     disk, it will also add the job to the cache.
2021
2022     @type job_id: int
2023     @param job_id: the job id
2024     @rtype: L{_QueuedJob} or None
2025     @return: either None or the job object
2026
2027     """
2028     job = self._memcache.get(job_id, None)
2029     if job:
2030       logging.debug("Found job %s in memcache", job_id)
2031       assert job.writable, "Found read-only job in memcache"
2032       return job
2033
2034     try:
2035       job = self._LoadJobFromDisk(job_id, False)
2036       if job is None:
2037         return job
2038     except errors.JobFileCorrupted:
2039       old_path = self._GetJobPath(job_id)
2040       new_path = self._GetArchivedJobPath(job_id)
2041       if old_path == new_path:
2042         # job already archived (future case)
2043         logging.exception("Can't parse job %s", job_id)
2044       else:
2045         # non-archived case
2046         logging.exception("Can't parse job %s, will archive.", job_id)
2047         self._RenameFilesUnlocked([(old_path, new_path)])
2048       return None
2049
2050     assert job.writable, "Job just loaded is not writable"
2051
2052     self._memcache[job_id] = job
2053     logging.debug("Added job %s to the cache", job_id)
2054     return job
2055
2056   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2057     """Load the given job file from disk.
2058
2059     Given a job file, read, load and restore it in a _QueuedJob format.
2060
2061     @type job_id: int
2062     @param job_id: job identifier
2063     @type try_archived: bool
2064     @param try_archived: Whether to try loading an archived job
2065     @rtype: L{_QueuedJob} or None
2066     @return: either None or the job object
2067
2068     """
2069     path_functions = [(self._GetJobPath, False)]
2070
2071     if try_archived:
2072       path_functions.append((self._GetArchivedJobPath, True))
2073
2074     raw_data = None
2075     archived = None
2076
2077     for (fn, archived) in path_functions:
2078       filepath = fn(job_id)
2079       logging.debug("Loading job from %s", filepath)
2080       try:
2081         raw_data = utils.ReadFile(filepath)
2082       except EnvironmentError, err:
2083         if err.errno != errno.ENOENT:
2084           raise
2085       else:
2086         break
2087
2088     if not raw_data:
2089       return None
2090
2091     if writable is None:
2092       writable = not archived
2093
2094     try:
2095       data = serializer.LoadJson(raw_data)
2096       job = _QueuedJob.Restore(self, data, writable, archived)
2097     except Exception, err: # pylint: disable=W0703
2098       raise errors.JobFileCorrupted(err)
2099
2100     return job
2101
2102   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2103     """Load the given job file from disk.
2104
2105     Given a job file, read, load and restore it in a _QueuedJob format.
2106     In case of error reading the job, it gets returned as None, and the
2107     exception is logged.
2108
2109     @type job_id: int
2110     @param job_id: job identifier
2111     @type try_archived: bool
2112     @param try_archived: Whether to try loading an archived job
2113     @rtype: L{_QueuedJob} or None
2114     @return: either None or the job object
2115
2116     """
2117     try:
2118       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2119     except (errors.JobFileCorrupted, EnvironmentError):
2120       logging.exception("Can't load/parse job %s", job_id)
2121       return None
2122
2123   def _UpdateQueueSizeUnlocked(self):
2124     """Update the queue size.
2125
2126     """
2127     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2128
2129   @locking.ssynchronized(_LOCK)
2130   @_RequireOpenQueue
2131   def SetDrainFlag(self, drain_flag):
2132     """Sets the drain flag for the queue.
2133
2134     @type drain_flag: boolean
2135     @param drain_flag: Whether to set or unset the drain flag
2136
2137     """
2138     # Change flag locally
2139     jstore.SetDrainFlag(drain_flag)
2140
2141     self._drained = drain_flag
2142
2143     # ... and on all nodes
2144     (names, addrs) = self._GetNodeIp()
2145     result = \
2146       self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2147     self._CheckRpcResult(result, self._nodes,
2148                          "Setting queue drain flag to %s" % drain_flag)
2149
2150     return True
2151
2152   @_RequireOpenQueue
2153   def _SubmitJobUnlocked(self, job_id, ops):
2154     """Create and store a new job.
2155
2156     This enters the job into our job queue and also puts it on the new
2157     queue, in order for it to be picked up by the queue processors.
2158
2159     @type job_id: job ID
2160     @param job_id: the job ID for the new job
2161     @type ops: list
2162     @param ops: The list of OpCodes that will become the new job.
2163     @rtype: L{_QueuedJob}
2164     @return: the job object to be queued
2165     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2166     @raise errors.GenericError: If an opcode is not valid
2167
2168     """
2169     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2170       raise errors.JobQueueFull()
2171
2172     job = _QueuedJob(self, job_id, ops, True)
2173
2174     for idx, op in enumerate(job.ops):
2175       # Check priority
2176       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2177         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2178         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2179                                   " are %s" % (idx, op.priority, allowed))
2180
2181       # Check job dependencies
2182       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2183       if not opcodes.TNoRelativeJobDependencies(dependencies):
2184         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2185                                   " match %s: %s" %
2186                                   (idx, opcodes.TNoRelativeJobDependencies,
2187                                    dependencies))
2188
2189     # Write to disk
2190     self.UpdateJobUnlocked(job)
2191
2192     self._queue_size += 1
2193
2194     logging.debug("Adding new job %s to the cache", job_id)
2195     self._memcache[job_id] = job
2196
2197     return job
2198
2199   @locking.ssynchronized(_LOCK)
2200   @_RequireOpenQueue
2201   @_RequireNonDrainedQueue
2202   def SubmitJob(self, ops):
2203     """Create and store a new job.
2204
2205     @see: L{_SubmitJobUnlocked}
2206
2207     """
2208     (job_id, ) = self._NewSerialsUnlocked(1)
2209     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2210     return job_id
2211
2212   @locking.ssynchronized(_LOCK)
2213   @_RequireOpenQueue
2214   @_RequireNonDrainedQueue
2215   def SubmitManyJobs(self, jobs):
2216     """Create and store multiple jobs.
2217
2218     @see: L{_SubmitJobUnlocked}
2219
2220     """
2221     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2222
2223     (results, added_jobs) = \
2224       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2225
2226     self._EnqueueJobsUnlocked(added_jobs)
2227
2228     return results
2229
2230   @staticmethod
2231   def _FormatSubmitError(msg, ops):
2232     """Formats errors which occurred while submitting a job.
2233
2234     """
2235     return ("%s; opcodes %s" %
2236             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2237
2238   @staticmethod
2239   def _ResolveJobDependencies(resolve_fn, deps):
2240     """Resolves relative job IDs in dependencies.
2241
2242     @type resolve_fn: callable
2243     @param resolve_fn: Function to resolve a relative job ID
2244     @type deps: list
2245     @param deps: Dependencies
2246     @rtype: tuple; (boolean, string or list)
2247     @return: If successful (first tuple item), the returned list contains
2248       resolved job IDs along with the requested status; if not successful,
2249       the second element is an error message
2250
2251     """
2252     result = []
2253
2254     for (dep_job_id, dep_status) in deps:
2255       if ht.TRelativeJobId(dep_job_id):
2256         assert ht.TInt(dep_job_id) and dep_job_id < 0
2257         try:
2258           job_id = resolve_fn(dep_job_id)
2259         except IndexError:
2260           # Abort
2261           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2262       else:
2263         job_id = dep_job_id
2264
2265       result.append((job_id, dep_status))
2266
2267     return (True, result)
2268
2269   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2270     """Create and store multiple jobs.
2271
2272     @see: L{_SubmitJobUnlocked}
2273
2274     """
2275     results = []
2276     added_jobs = []
2277
2278     def resolve_fn(job_idx, reljobid):
2279       assert reljobid < 0
2280       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2281
2282     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2283       for op in ops:
2284         if getattr(op, opcodes.DEPEND_ATTR, None):
2285           (status, data) = \
2286             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2287                                          op.depends)
2288           if not status:
2289             # Abort resolving dependencies
2290             assert ht.TNonEmptyString(data), "No error message"
2291             break
2292           # Use resolved dependencies
2293           op.depends = data
2294       else:
2295         try:
2296           job = self._SubmitJobUnlocked(job_id, ops)
2297         except errors.GenericError, err:
2298           status = False
2299           data = self._FormatSubmitError(str(err), ops)
2300         else:
2301           status = True
2302           data = job_id
2303           added_jobs.append(job)
2304
2305       results.append((status, data))
2306
2307     return (results, added_jobs)
2308
2309   @locking.ssynchronized(_LOCK)
2310   def _EnqueueJobs(self, jobs):
2311     """Helper function to add jobs to worker pool's queue.
2312
2313     @type jobs: list
2314     @param jobs: List of all jobs
2315
2316     """
2317     return self._EnqueueJobsUnlocked(jobs)
2318
2319   def _EnqueueJobsUnlocked(self, jobs):
2320     """Helper function to add jobs to worker pool's queue.
2321
2322     @type jobs: list
2323     @param jobs: List of all jobs
2324
2325     """
2326     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2327     self._wpool.AddManyTasks([(job, ) for job in jobs],
2328                              priority=[job.CalcPriority() for job in jobs],
2329                              task_id=map(_GetIdAttr, jobs))
2330
2331   def _GetJobStatusForDependencies(self, job_id):
2332     """Gets the status of a job for dependencies.
2333
2334     @type job_id: int
2335     @param job_id: Job ID
2336     @raise errors.JobLost: If job can't be found
2337
2338     """
2339     # Not using in-memory cache as doing so would require an exclusive lock
2340
2341     # Try to load from disk
2342     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2343
2344     assert not job.writable, "Got writable job" # pylint: disable=E1101
2345
2346     if job:
2347       return job.CalcStatus()
2348
2349     raise errors.JobLost("Job %s not found" % job_id)
2350
2351   @_RequireOpenQueue
2352   def UpdateJobUnlocked(self, job, replicate=True):
2353     """Update a job's on disk storage.
2354
2355     After a job has been modified, this function needs to be called in
2356     order to write the changes to disk and replicate them to the other
2357     nodes.
2358
2359     @type job: L{_QueuedJob}
2360     @param job: the changed job
2361     @type replicate: boolean
2362     @param replicate: whether to replicate the change to remote nodes
2363
2364     """
2365     if __debug__:
2366       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2367       assert (finalized ^ (job.end_timestamp is None))
2368       assert job.writable, "Can't update read-only job"
2369       assert not job.archived, "Can't update archived job"
2370
2371     filename = self._GetJobPath(job.id)
2372     data = serializer.DumpJson(job.Serialize())
2373     logging.debug("Writing job %s to %s", job.id, filename)
2374     self._UpdateJobQueueFile(filename, data, replicate)
2375
2376   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2377                         timeout):
2378     """Waits for changes in a job.
2379
2380     @type job_id: int
2381     @param job_id: Job identifier
2382     @type fields: list of strings
2383     @param fields: Which fields to check for changes
2384     @type prev_job_info: list or None
2385     @param prev_job_info: Last job information returned
2386     @type prev_log_serial: int
2387     @param prev_log_serial: Last job message serial number
2388     @type timeout: float
2389     @param timeout: maximum time to wait in seconds
2390     @rtype: tuple (job info, log entries)
2391     @return: a tuple of the job information as required via
2392         the fields parameter, and the log entries as a list
2393
2394         if the job has not changed and the timeout has expired,
2395         we instead return a special value,
2396         L{constants.JOB_NOTCHANGED}, which should be interpreted
2397         as such by the clients
2398
2399     """
2400     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2401                              writable=False)
2402
2403     helper = _WaitForJobChangesHelper()
2404
2405     return helper(self._GetJobPath(job_id), load_fn,
2406                   fields, prev_job_info, prev_log_serial, timeout)
2407
2408   @locking.ssynchronized(_LOCK)
2409   @_RequireOpenQueue
2410   def CancelJob(self, job_id):
2411     """Cancels a job.
2412
2413     This will only succeed if the job has not started yet.
2414
2415     @type job_id: int
2416     @param job_id: job ID of job to be cancelled.
2417
2418     """
2419     logging.info("Cancelling job %s", job_id)
2420
2421     return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2422
2423   @locking.ssynchronized(_LOCK)
2424   @_RequireOpenQueue
2425   def ChangeJobPriority(self, job_id, priority):
2426     """Changes a job's priority.
2427
2428     @type job_id: int
2429     @param job_id: ID of the job whose priority should be changed
2430     @type priority: int
2431     @param priority: New priority
2432
2433     """
2434     logging.info("Changing priority of job %s to %s", job_id, priority)
2435
2436     if priority not in constants.OP_PRIO_SUBMIT_VALID:
2437       allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2438       raise errors.GenericError("Invalid priority %s, allowed are %s" %
2439                                 (priority, allowed))
2440
2441     def fn(job):
2442       (success, msg) = job.ChangePriority(priority)
2443
2444       if success:
2445         try:
2446           self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2447         except workerpool.NoSuchTask:
2448           logging.debug("Job %s is not in workerpool at this time", job.id)
2449
2450       return (success, msg)
2451
2452     return self._ModifyJobUnlocked(job_id, fn)
2453
2454   def _ModifyJobUnlocked(self, job_id, mod_fn):
2455     """Modifies a job.
2456
2457     @type job_id: int
2458     @param job_id: Job ID
2459     @type mod_fn: callable
2460     @param mod_fn: Modifying function, receiving job object as parameter,
2461       returning tuple of (status boolean, message string)
2462
2463     """
2464     job = self._LoadJobUnlocked(job_id)
2465     if not job:
2466       logging.debug("Job %s not found", job_id)
2467       return (False, "Job %s not found" % job_id)
2468
2469     assert job.writable, "Can't modify read-only job"
2470     assert not job.archived, "Can't modify archived job"
2471
2472     (success, msg) = mod_fn(job)
2473
2474     if success:
2475       # If the job was finalized (e.g. cancelled), this is the final write
2476       # allowed. The job can be archived anytime.
2477       self.UpdateJobUnlocked(job)
2478
2479     return (success, msg)
2480
2481   @_RequireOpenQueue
2482   def _ArchiveJobsUnlocked(self, jobs):
2483     """Archives jobs.
2484
2485     @type jobs: list of L{_QueuedJob}
2486     @param jobs: Job objects
2487     @rtype: int
2488     @return: Number of archived jobs
2489
2490     """
2491     archive_jobs = []
2492     rename_files = []
2493     for job in jobs:
2494       assert job.writable, "Can't archive read-only job"
2495       assert not job.archived, "Can't cancel archived job"
2496
2497       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2498         logging.debug("Job %s is not yet done", job.id)
2499         continue
2500
2501       archive_jobs.append(job)
2502
2503       old = self._GetJobPath(job.id)
2504       new = self._GetArchivedJobPath(job.id)
2505       rename_files.append((old, new))
2506
2507     # TODO: What if 1..n files fail to rename?
2508     self._RenameFilesUnlocked(rename_files)
2509
2510     logging.debug("Successfully archived job(s) %s",
2511                   utils.CommaJoin(job.id for job in archive_jobs))
2512
2513     # Since we haven't quite checked, above, if we succeeded or failed renaming
2514     # the files, we update the cached queue size from the filesystem. When we
2515     # get around to fix the TODO: above, we can use the number of actually
2516     # archived jobs to fix this.
2517     self._UpdateQueueSizeUnlocked()
2518     return len(archive_jobs)
2519
2520   @locking.ssynchronized(_LOCK)
2521   @_RequireOpenQueue
2522   def ArchiveJob(self, job_id):
2523     """Archives a job.
2524
2525     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2526
2527     @type job_id: int
2528     @param job_id: Job ID of job to be archived.
2529     @rtype: bool
2530     @return: Whether job was archived
2531
2532     """
2533     logging.info("Archiving job %s", job_id)
2534
2535     job = self._LoadJobUnlocked(job_id)
2536     if not job:
2537       logging.debug("Job %s not found", job_id)
2538       return False
2539
2540     return self._ArchiveJobsUnlocked([job]) == 1
2541
2542   @locking.ssynchronized(_LOCK)
2543   @_RequireOpenQueue
2544   def AutoArchiveJobs(self, age, timeout):
2545     """Archives all jobs based on age.
2546
2547     The method will archive all jobs which are older than the age
2548     parameter. For jobs that don't have an end timestamp, the start
2549     timestamp will be considered. The special '-1' age will cause
2550     archival of all jobs (that are not running or queued).
2551
2552     @type age: int
2553     @param age: the minimum age in seconds
2554
2555     """
2556     logging.info("Archiving jobs with age more than %s seconds", age)
2557
2558     now = time.time()
2559     end_time = now + timeout
2560     archived_count = 0
2561     last_touched = 0
2562
2563     all_job_ids = self._GetJobIDsUnlocked()
2564     pending = []
2565     for idx, job_id in enumerate(all_job_ids):
2566       last_touched = idx + 1
2567
2568       # Not optimal because jobs could be pending
2569       # TODO: Measure average duration for job archival and take number of
2570       # pending jobs into account.
2571       if time.time() > end_time:
2572         break
2573
2574       # Returns None if the job failed to load
2575       job = self._LoadJobUnlocked(job_id)
2576       if job:
2577         if job.end_timestamp is None:
2578           if job.start_timestamp is None:
2579             job_age = job.received_timestamp
2580           else:
2581             job_age = job.start_timestamp
2582         else:
2583           job_age = job.end_timestamp
2584
2585         if age == -1 or now - job_age[0] > age:
2586           pending.append(job)
2587
2588           # Archive 10 jobs at a time
2589           if len(pending) >= 10:
2590             archived_count += self._ArchiveJobsUnlocked(pending)
2591             pending = []
2592
2593     if pending:
2594       archived_count += self._ArchiveJobsUnlocked(pending)
2595
2596     return (archived_count, len(all_job_ids) - last_touched)
2597
2598   def _Query(self, fields, qfilter):
2599     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2600                        namefield="id")
2601
2602     # Archived jobs are only looked at if the "archived" field is referenced
2603     # either as a requested field or in the filter. By default archived jobs
2604     # are ignored.
2605     include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2606
2607     job_ids = qobj.RequestedNames()
2608
2609     list_all = (job_ids is None)
2610
2611     if list_all:
2612       # Since files are added to/removed from the queue atomically, there's no
2613       # risk of getting the job ids in an inconsistent state.
2614       job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2615
2616     jobs = []
2617
2618     for job_id in job_ids:
2619       job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2620       if job is not None or not list_all:
2621         jobs.append((job_id, job))
2622
2623     return (qobj, jobs, list_all)
2624
2625   def QueryJobs(self, fields, qfilter):
2626     """Returns a list of jobs in queue.
2627
2628     @type fields: sequence
2629     @param fields: List of wanted fields
2630     @type qfilter: None or query2 filter (list)
2631     @param qfilter: Query filter
2632
2633     """
2634     (qobj, ctx, _) = self._Query(fields, qfilter)
2635
2636     return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2637
2638   def OldStyleQueryJobs(self, job_ids, fields):
2639     """Returns a list of jobs in queue.
2640
2641     @type job_ids: list
2642     @param job_ids: sequence of job identifiers or None for all
2643     @type fields: list
2644     @param fields: names of fields to return
2645     @rtype: list
2646     @return: list one element per job, each element being list with
2647         the requested fields
2648
2649     """
2650     # backwards compat:
2651     job_ids = [int(jid) for jid in job_ids]
2652     qfilter = qlang.MakeSimpleFilter("id", job_ids)
2653
2654     (qobj, ctx, _) = self._Query(fields, qfilter)
2655
2656     return qobj.OldStyleQuery(ctx, sort_by_name=False)
2657
2658   @locking.ssynchronized(_LOCK)
2659   def PrepareShutdown(self):
2660     """Prepare to stop the job queue.
2661
2662     Disables execution of jobs in the workerpool and returns whether there are
2663     any jobs currently running. If the latter is the case, the job queue is not
2664     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2665     be called without interfering with any job. Queued and unfinished jobs will
2666     be resumed next time.
2667
2668     Once this function has been called no new job submissions will be accepted
2669     (see L{_RequireNonDrainedQueue}).
2670
2671     @rtype: bool
2672     @return: Whether there are any running jobs
2673
2674     """
2675     if self._accepting_jobs:
2676       self._accepting_jobs = False
2677
2678       # Tell worker pool to stop processing pending tasks
2679       self._wpool.SetActive(False)
2680
2681     return self._wpool.HasRunningTasks()
2682
2683   def AcceptingJobsUnlocked(self):
2684     """Returns whether jobs are accepted.
2685
2686     Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2687     queue is shutting down.
2688
2689     @rtype: bool
2690
2691     """
2692     return self._accepting_jobs
2693
2694   @locking.ssynchronized(_LOCK)
2695   @_RequireOpenQueue
2696   def Shutdown(self):
2697     """Stops the job queue.
2698
2699     This shutdowns all the worker threads an closes the queue.
2700
2701     """
2702     self._wpool.TerminateWorkers()
2703
2704     self._queue_filelock.Close()
2705     self._queue_filelock = None