gnt-job: List archived jobs if requested
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38
39 try:
40   # pylint: disable=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59 from ganeti import ht
60 from ganeti import query
61 from ganeti import qlang
62 from ganeti import pathutils
63 from ganeti import vcluster
64
65
66 JOBQUEUE_THREADS = 25
67
68 # member lock names to be passed to @ssynchronized decorator
69 _LOCK = "_lock"
70 _QUEUE = "_queue"
71
72
73 class CancelJob(Exception):
74   """Special exception to cancel a job.
75
76   """
77
78
79 def TimeStampNow():
80   """Returns the current timestamp.
81
82   @rtype: tuple
83   @return: the current time in the (seconds, microseconds) format
84
85   """
86   return utils.SplitTime(time.time())
87
88
89 def _CallJqUpdate(runner, names, file_name, content):
90   """Updates job queue file after virtualizing filename.
91
92   """
93   virt_file_name = vcluster.MakeVirtualPath(file_name)
94   return runner.call_jobqueue_update(names, virt_file_name, content)
95
96
97 class _SimpleJobQuery:
98   """Wrapper for job queries.
99
100   Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
101
102   """
103   def __init__(self, fields):
104     """Initializes this class.
105
106     """
107     self._query = query.Query(query.JOB_FIELDS, fields)
108
109   def __call__(self, job):
110     """Executes a job query using cached field list.
111
112     """
113     return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
114
115
116 class _QueuedOpCode(object):
117   """Encapsulates an opcode object.
118
119   @ivar log: holds the execution log and consists of tuples
120   of the form C{(log_serial, timestamp, level, message)}
121   @ivar input: the OpCode we encapsulate
122   @ivar status: the current status
123   @ivar result: the result of the LU execution
124   @ivar start_timestamp: timestamp for the start of the execution
125   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
126   @ivar stop_timestamp: timestamp for the end of the execution
127
128   """
129   __slots__ = ["input", "status", "result", "log", "priority",
130                "start_timestamp", "exec_timestamp", "end_timestamp",
131                "__weakref__"]
132
133   def __init__(self, op):
134     """Initializes instances of this class.
135
136     @type op: L{opcodes.OpCode}
137     @param op: the opcode we encapsulate
138
139     """
140     self.input = op
141     self.status = constants.OP_STATUS_QUEUED
142     self.result = None
143     self.log = []
144     self.start_timestamp = None
145     self.exec_timestamp = None
146     self.end_timestamp = None
147
148     # Get initial priority (it might change during the lifetime of this opcode)
149     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
150
151   @classmethod
152   def Restore(cls, state):
153     """Restore the _QueuedOpCode from the serialized form.
154
155     @type state: dict
156     @param state: the serialized state
157     @rtype: _QueuedOpCode
158     @return: a new _QueuedOpCode instance
159
160     """
161     obj = _QueuedOpCode.__new__(cls)
162     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
163     obj.status = state["status"]
164     obj.result = state["result"]
165     obj.log = state["log"]
166     obj.start_timestamp = state.get("start_timestamp", None)
167     obj.exec_timestamp = state.get("exec_timestamp", None)
168     obj.end_timestamp = state.get("end_timestamp", None)
169     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
170     return obj
171
172   def Serialize(self):
173     """Serializes this _QueuedOpCode.
174
175     @rtype: dict
176     @return: the dictionary holding the serialized state
177
178     """
179     return {
180       "input": self.input.__getstate__(),
181       "status": self.status,
182       "result": self.result,
183       "log": self.log,
184       "start_timestamp": self.start_timestamp,
185       "exec_timestamp": self.exec_timestamp,
186       "end_timestamp": self.end_timestamp,
187       "priority": self.priority,
188       }
189
190
191 class _QueuedJob(object):
192   """In-memory job representation.
193
194   This is what we use to track the user-submitted jobs. Locking must
195   be taken care of by users of this class.
196
197   @type queue: L{JobQueue}
198   @ivar queue: the parent queue
199   @ivar id: the job ID
200   @type ops: list
201   @ivar ops: the list of _QueuedOpCode that constitute the job
202   @type log_serial: int
203   @ivar log_serial: holds the index for the next log entry
204   @ivar received_timestamp: the timestamp for when the job was received
205   @ivar start_timestmap: the timestamp for start of execution
206   @ivar end_timestamp: the timestamp for end of execution
207   @ivar writable: Whether the job is allowed to be modified
208
209   """
210   # pylint: disable=W0212
211   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
212                "received_timestamp", "start_timestamp", "end_timestamp",
213                "__weakref__", "processor_lock", "writable", "archived"]
214
215   def __init__(self, queue, job_id, ops, writable):
216     """Constructor for the _QueuedJob.
217
218     @type queue: L{JobQueue}
219     @param queue: our parent queue
220     @type job_id: job_id
221     @param job_id: our job id
222     @type ops: list
223     @param ops: the list of opcodes we hold, which will be encapsulated
224         in _QueuedOpCodes
225     @type writable: bool
226     @param writable: Whether job can be modified
227
228     """
229     if not ops:
230       raise errors.GenericError("A job needs at least one opcode")
231
232     self.queue = queue
233     self.id = int(job_id)
234     self.ops = [_QueuedOpCode(op) for op in ops]
235     self.log_serial = 0
236     self.received_timestamp = TimeStampNow()
237     self.start_timestamp = None
238     self.end_timestamp = None
239     self.archived = False
240
241     self._InitInMemory(self, writable)
242
243     assert not self.archived, "New jobs can not be marked as archived"
244
245   @staticmethod
246   def _InitInMemory(obj, writable):
247     """Initializes in-memory variables.
248
249     """
250     obj.writable = writable
251     obj.ops_iter = None
252     obj.cur_opctx = None
253
254     # Read-only jobs are not processed and therefore don't need a lock
255     if writable:
256       obj.processor_lock = threading.Lock()
257     else:
258       obj.processor_lock = None
259
260   def __repr__(self):
261     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
262               "id=%s" % self.id,
263               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
264
265     return "<%s at %#x>" % (" ".join(status), id(self))
266
267   @classmethod
268   def Restore(cls, queue, state, writable, archived):
269     """Restore a _QueuedJob from serialized state:
270
271     @type queue: L{JobQueue}
272     @param queue: to which queue the restored job belongs
273     @type state: dict
274     @param state: the serialized state
275     @type writable: bool
276     @param writable: Whether job can be modified
277     @type archived: bool
278     @param archived: Whether job was already archived
279     @rtype: _JobQueue
280     @return: the restored _JobQueue instance
281
282     """
283     obj = _QueuedJob.__new__(cls)
284     obj.queue = queue
285     obj.id = int(state["id"])
286     obj.received_timestamp = state.get("received_timestamp", None)
287     obj.start_timestamp = state.get("start_timestamp", None)
288     obj.end_timestamp = state.get("end_timestamp", None)
289     obj.archived = archived
290
291     obj.ops = []
292     obj.log_serial = 0
293     for op_state in state["ops"]:
294       op = _QueuedOpCode.Restore(op_state)
295       for log_entry in op.log:
296         obj.log_serial = max(obj.log_serial, log_entry[0])
297       obj.ops.append(op)
298
299     cls._InitInMemory(obj, writable)
300
301     return obj
302
303   def Serialize(self):
304     """Serialize the _JobQueue instance.
305
306     @rtype: dict
307     @return: the serialized state
308
309     """
310     return {
311       "id": self.id,
312       "ops": [op.Serialize() for op in self.ops],
313       "start_timestamp": self.start_timestamp,
314       "end_timestamp": self.end_timestamp,
315       "received_timestamp": self.received_timestamp,
316       }
317
318   def CalcStatus(self):
319     """Compute the status of this job.
320
321     This function iterates over all the _QueuedOpCodes in the job and
322     based on their status, computes the job status.
323
324     The algorithm is:
325       - if we find a cancelled, or finished with error, the job
326         status will be the same
327       - otherwise, the last opcode with the status one of:
328           - waitlock
329           - canceling
330           - running
331
332         will determine the job status
333
334       - otherwise, it means either all opcodes are queued, or success,
335         and the job status will be the same
336
337     @return: the job status
338
339     """
340     status = constants.JOB_STATUS_QUEUED
341
342     all_success = True
343     for op in self.ops:
344       if op.status == constants.OP_STATUS_SUCCESS:
345         continue
346
347       all_success = False
348
349       if op.status == constants.OP_STATUS_QUEUED:
350         pass
351       elif op.status == constants.OP_STATUS_WAITING:
352         status = constants.JOB_STATUS_WAITING
353       elif op.status == constants.OP_STATUS_RUNNING:
354         status = constants.JOB_STATUS_RUNNING
355       elif op.status == constants.OP_STATUS_CANCELING:
356         status = constants.JOB_STATUS_CANCELING
357         break
358       elif op.status == constants.OP_STATUS_ERROR:
359         status = constants.JOB_STATUS_ERROR
360         # The whole job fails if one opcode failed
361         break
362       elif op.status == constants.OP_STATUS_CANCELED:
363         status = constants.OP_STATUS_CANCELED
364         break
365
366     if all_success:
367       status = constants.JOB_STATUS_SUCCESS
368
369     return status
370
371   def CalcPriority(self):
372     """Gets the current priority for this job.
373
374     Only unfinished opcodes are considered. When all are done, the default
375     priority is used.
376
377     @rtype: int
378
379     """
380     priorities = [op.priority for op in self.ops
381                   if op.status not in constants.OPS_FINALIZED]
382
383     if not priorities:
384       # All opcodes are done, assume default priority
385       return constants.OP_PRIO_DEFAULT
386
387     return min(priorities)
388
389   def GetLogEntries(self, newer_than):
390     """Selectively returns the log entries.
391
392     @type newer_than: None or int
393     @param newer_than: if this is None, return all log entries,
394         otherwise return only the log entries with serial higher
395         than this value
396     @rtype: list
397     @return: the list of the log entries selected
398
399     """
400     if newer_than is None:
401       serial = -1
402     else:
403       serial = newer_than
404
405     entries = []
406     for op in self.ops:
407       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
408
409     return entries
410
411   def GetInfo(self, fields):
412     """Returns information about a job.
413
414     @type fields: list
415     @param fields: names of fields to return
416     @rtype: list
417     @return: list with one element for each field
418     @raise errors.OpExecError: when an invalid field
419         has been passed
420
421     """
422     return _SimpleJobQuery(fields)(self)
423
424   def MarkUnfinishedOps(self, status, result):
425     """Mark unfinished opcodes with a given status and result.
426
427     This is an utility function for marking all running or waiting to
428     be run opcodes with a given status. Opcodes which are already
429     finalised are not changed.
430
431     @param status: a given opcode status
432     @param result: the opcode result
433
434     """
435     not_marked = True
436     for op in self.ops:
437       if op.status in constants.OPS_FINALIZED:
438         assert not_marked, "Finalized opcodes found after non-finalized ones"
439         continue
440       op.status = status
441       op.result = result
442       not_marked = False
443
444   def Finalize(self):
445     """Marks the job as finalized.
446
447     """
448     self.end_timestamp = TimeStampNow()
449
450   def Cancel(self):
451     """Marks job as canceled/-ing if possible.
452
453     @rtype: tuple; (bool, string)
454     @return: Boolean describing whether job was successfully canceled or marked
455       as canceling and a text message
456
457     """
458     status = self.CalcStatus()
459
460     if status == constants.JOB_STATUS_QUEUED:
461       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
462                              "Job canceled by request")
463       self.Finalize()
464       return (True, "Job %s canceled" % self.id)
465
466     elif status == constants.JOB_STATUS_WAITING:
467       # The worker will notice the new status and cancel the job
468       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
469       return (True, "Job %s will be canceled" % self.id)
470
471     else:
472       logging.debug("Job %s is no longer waiting in the queue", self.id)
473       return (False, "Job %s is no longer waiting in the queue" % self.id)
474
475
476 class _OpExecCallbacks(mcpu.OpExecCbBase):
477   def __init__(self, queue, job, op):
478     """Initializes this class.
479
480     @type queue: L{JobQueue}
481     @param queue: Job queue
482     @type job: L{_QueuedJob}
483     @param job: Job object
484     @type op: L{_QueuedOpCode}
485     @param op: OpCode
486
487     """
488     assert queue, "Queue is missing"
489     assert job, "Job is missing"
490     assert op, "Opcode is missing"
491
492     self._queue = queue
493     self._job = job
494     self._op = op
495
496   def _CheckCancel(self):
497     """Raises an exception to cancel the job if asked to.
498
499     """
500     # Cancel here if we were asked to
501     if self._op.status == constants.OP_STATUS_CANCELING:
502       logging.debug("Canceling opcode")
503       raise CancelJob()
504
505   @locking.ssynchronized(_QUEUE, shared=1)
506   def NotifyStart(self):
507     """Mark the opcode as running, not lock-waiting.
508
509     This is called from the mcpu code as a notifier function, when the LU is
510     finally about to start the Exec() method. Of course, to have end-user
511     visible results, the opcode must be initially (before calling into
512     Processor.ExecOpCode) set to OP_STATUS_WAITING.
513
514     """
515     assert self._op in self._job.ops
516     assert self._op.status in (constants.OP_STATUS_WAITING,
517                                constants.OP_STATUS_CANCELING)
518
519     # Cancel here if we were asked to
520     self._CheckCancel()
521
522     logging.debug("Opcode is now running")
523
524     self._op.status = constants.OP_STATUS_RUNNING
525     self._op.exec_timestamp = TimeStampNow()
526
527     # And finally replicate the job status
528     self._queue.UpdateJobUnlocked(self._job)
529
530   @locking.ssynchronized(_QUEUE, shared=1)
531   def _AppendFeedback(self, timestamp, log_type, log_msg):
532     """Internal feedback append function, with locks
533
534     """
535     self._job.log_serial += 1
536     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
537     self._queue.UpdateJobUnlocked(self._job, replicate=False)
538
539   def Feedback(self, *args):
540     """Append a log entry.
541
542     """
543     assert len(args) < 3
544
545     if len(args) == 1:
546       log_type = constants.ELOG_MESSAGE
547       log_msg = args[0]
548     else:
549       (log_type, log_msg) = args
550
551     # The time is split to make serialization easier and not lose
552     # precision.
553     timestamp = utils.SplitTime(time.time())
554     self._AppendFeedback(timestamp, log_type, log_msg)
555
556   def CheckCancel(self):
557     """Check whether job has been cancelled.
558
559     """
560     assert self._op.status in (constants.OP_STATUS_WAITING,
561                                constants.OP_STATUS_CANCELING)
562
563     # Cancel here if we were asked to
564     self._CheckCancel()
565
566   def SubmitManyJobs(self, jobs):
567     """Submits jobs for processing.
568
569     See L{JobQueue.SubmitManyJobs}.
570
571     """
572     # Locking is done in job queue
573     return self._queue.SubmitManyJobs(jobs)
574
575
576 class _JobChangesChecker(object):
577   def __init__(self, fields, prev_job_info, prev_log_serial):
578     """Initializes this class.
579
580     @type fields: list of strings
581     @param fields: Fields requested by LUXI client
582     @type prev_job_info: string
583     @param prev_job_info: previous job info, as passed by the LUXI client
584     @type prev_log_serial: string
585     @param prev_log_serial: previous job serial, as passed by the LUXI client
586
587     """
588     self._squery = _SimpleJobQuery(fields)
589     self._prev_job_info = prev_job_info
590     self._prev_log_serial = prev_log_serial
591
592   def __call__(self, job):
593     """Checks whether job has changed.
594
595     @type job: L{_QueuedJob}
596     @param job: Job object
597
598     """
599     assert not job.writable, "Expected read-only job"
600
601     status = job.CalcStatus()
602     job_info = self._squery(job)
603     log_entries = job.GetLogEntries(self._prev_log_serial)
604
605     # Serializing and deserializing data can cause type changes (e.g. from
606     # tuple to list) or precision loss. We're doing it here so that we get
607     # the same modifications as the data received from the client. Without
608     # this, the comparison afterwards might fail without the data being
609     # significantly different.
610     # TODO: we just deserialized from disk, investigate how to make sure that
611     # the job info and log entries are compatible to avoid this further step.
612     # TODO: Doing something like in testutils.py:UnifyValueType might be more
613     # efficient, though floats will be tricky
614     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
615     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
616
617     # Don't even try to wait if the job is no longer running, there will be
618     # no changes.
619     if (status not in (constants.JOB_STATUS_QUEUED,
620                        constants.JOB_STATUS_RUNNING,
621                        constants.JOB_STATUS_WAITING) or
622         job_info != self._prev_job_info or
623         (log_entries and self._prev_log_serial != log_entries[0][0])):
624       logging.debug("Job %s changed", job.id)
625       return (job_info, log_entries)
626
627     return None
628
629
630 class _JobFileChangesWaiter(object):
631   def __init__(self, filename):
632     """Initializes this class.
633
634     @type filename: string
635     @param filename: Path to job file
636     @raises errors.InotifyError: if the notifier cannot be setup
637
638     """
639     self._wm = pyinotify.WatchManager()
640     self._inotify_handler = \
641       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
642     self._notifier = \
643       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
644     try:
645       self._inotify_handler.enable()
646     except Exception:
647       # pyinotify doesn't close file descriptors automatically
648       self._notifier.stop()
649       raise
650
651   def _OnInotify(self, notifier_enabled):
652     """Callback for inotify.
653
654     """
655     if not notifier_enabled:
656       self._inotify_handler.enable()
657
658   def Wait(self, timeout):
659     """Waits for the job file to change.
660
661     @type timeout: float
662     @param timeout: Timeout in seconds
663     @return: Whether there have been events
664
665     """
666     assert timeout >= 0
667     have_events = self._notifier.check_events(timeout * 1000)
668     if have_events:
669       self._notifier.read_events()
670     self._notifier.process_events()
671     return have_events
672
673   def Close(self):
674     """Closes underlying notifier and its file descriptor.
675
676     """
677     self._notifier.stop()
678
679
680 class _JobChangesWaiter(object):
681   def __init__(self, filename):
682     """Initializes this class.
683
684     @type filename: string
685     @param filename: Path to job file
686
687     """
688     self._filewaiter = None
689     self._filename = filename
690
691   def Wait(self, timeout):
692     """Waits for a job to change.
693
694     @type timeout: float
695     @param timeout: Timeout in seconds
696     @return: Whether there have been events
697
698     """
699     if self._filewaiter:
700       return self._filewaiter.Wait(timeout)
701
702     # Lazy setup: Avoid inotify setup cost when job file has already changed.
703     # If this point is reached, return immediately and let caller check the job
704     # file again in case there were changes since the last check. This avoids a
705     # race condition.
706     self._filewaiter = _JobFileChangesWaiter(self._filename)
707
708     return True
709
710   def Close(self):
711     """Closes underlying waiter.
712
713     """
714     if self._filewaiter:
715       self._filewaiter.Close()
716
717
718 class _WaitForJobChangesHelper(object):
719   """Helper class using inotify to wait for changes in a job file.
720
721   This class takes a previous job status and serial, and alerts the client when
722   the current job status has changed.
723
724   """
725   @staticmethod
726   def _CheckForChanges(counter, job_load_fn, check_fn):
727     if counter.next() > 0:
728       # If this isn't the first check the job is given some more time to change
729       # again. This gives better performance for jobs generating many
730       # changes/messages.
731       time.sleep(0.1)
732
733     job = job_load_fn()
734     if not job:
735       raise errors.JobLost()
736
737     result = check_fn(job)
738     if result is None:
739       raise utils.RetryAgain()
740
741     return result
742
743   def __call__(self, filename, job_load_fn,
744                fields, prev_job_info, prev_log_serial, timeout):
745     """Waits for changes on a job.
746
747     @type filename: string
748     @param filename: File on which to wait for changes
749     @type job_load_fn: callable
750     @param job_load_fn: Function to load job
751     @type fields: list of strings
752     @param fields: Which fields to check for changes
753     @type prev_job_info: list or None
754     @param prev_job_info: Last job information returned
755     @type prev_log_serial: int
756     @param prev_log_serial: Last job message serial number
757     @type timeout: float
758     @param timeout: maximum time to wait in seconds
759
760     """
761     counter = itertools.count()
762     try:
763       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
764       waiter = _JobChangesWaiter(filename)
765       try:
766         return utils.Retry(compat.partial(self._CheckForChanges,
767                                           counter, job_load_fn, check_fn),
768                            utils.RETRY_REMAINING_TIME, timeout,
769                            wait_fn=waiter.Wait)
770       finally:
771         waiter.Close()
772     except (errors.InotifyError, errors.JobLost):
773       return None
774     except utils.RetryTimeout:
775       return constants.JOB_NOTCHANGED
776
777
778 def _EncodeOpError(err):
779   """Encodes an error which occurred while processing an opcode.
780
781   """
782   if isinstance(err, errors.GenericError):
783     to_encode = err
784   else:
785     to_encode = errors.OpExecError(str(err))
786
787   return errors.EncodeException(to_encode)
788
789
790 class _TimeoutStrategyWrapper:
791   def __init__(self, fn):
792     """Initializes this class.
793
794     """
795     self._fn = fn
796     self._next = None
797
798   def _Advance(self):
799     """Gets the next timeout if necessary.
800
801     """
802     if self._next is None:
803       self._next = self._fn()
804
805   def Peek(self):
806     """Returns the next timeout.
807
808     """
809     self._Advance()
810     return self._next
811
812   def Next(self):
813     """Returns the current timeout and advances the internal state.
814
815     """
816     self._Advance()
817     result = self._next
818     self._next = None
819     return result
820
821
822 class _OpExecContext:
823   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
824     """Initializes this class.
825
826     """
827     self.op = op
828     self.index = index
829     self.log_prefix = log_prefix
830     self.summary = op.input.Summary()
831
832     # Create local copy to modify
833     if getattr(op.input, opcodes.DEPEND_ATTR, None):
834       self.jobdeps = op.input.depends[:]
835     else:
836       self.jobdeps = None
837
838     self._timeout_strategy_factory = timeout_strategy_factory
839     self._ResetTimeoutStrategy()
840
841   def _ResetTimeoutStrategy(self):
842     """Creates a new timeout strategy.
843
844     """
845     self._timeout_strategy = \
846       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
847
848   def CheckPriorityIncrease(self):
849     """Checks whether priority can and should be increased.
850
851     Called when locks couldn't be acquired.
852
853     """
854     op = self.op
855
856     # Exhausted all retries and next round should not use blocking acquire
857     # for locks?
858     if (self._timeout_strategy.Peek() is None and
859         op.priority > constants.OP_PRIO_HIGHEST):
860       logging.debug("Increasing priority")
861       op.priority -= 1
862       self._ResetTimeoutStrategy()
863       return True
864
865     return False
866
867   def GetNextLockTimeout(self):
868     """Returns the next lock acquire timeout.
869
870     """
871     return self._timeout_strategy.Next()
872
873
874 class _JobProcessor(object):
875   (DEFER,
876    WAITDEP,
877    FINISHED) = range(1, 4)
878
879   def __init__(self, queue, opexec_fn, job,
880                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
881     """Initializes this class.
882
883     """
884     self.queue = queue
885     self.opexec_fn = opexec_fn
886     self.job = job
887     self._timeout_strategy_factory = _timeout_strategy_factory
888
889   @staticmethod
890   def _FindNextOpcode(job, timeout_strategy_factory):
891     """Locates the next opcode to run.
892
893     @type job: L{_QueuedJob}
894     @param job: Job object
895     @param timeout_strategy_factory: Callable to create new timeout strategy
896
897     """
898     # Create some sort of a cache to speed up locating next opcode for future
899     # lookups
900     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
901     # pending and one for processed ops.
902     if job.ops_iter is None:
903       job.ops_iter = enumerate(job.ops)
904
905     # Find next opcode to run
906     while True:
907       try:
908         (idx, op) = job.ops_iter.next()
909       except StopIteration:
910         raise errors.ProgrammerError("Called for a finished job")
911
912       if op.status == constants.OP_STATUS_RUNNING:
913         # Found an opcode already marked as running
914         raise errors.ProgrammerError("Called for job marked as running")
915
916       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
917                              timeout_strategy_factory)
918
919       if op.status not in constants.OPS_FINALIZED:
920         return opctx
921
922       # This is a job that was partially completed before master daemon
923       # shutdown, so it can be expected that some opcodes are already
924       # completed successfully (if any did error out, then the whole job
925       # should have been aborted and not resubmitted for processing).
926       logging.info("%s: opcode %s already processed, skipping",
927                    opctx.log_prefix, opctx.summary)
928
929   @staticmethod
930   def _MarkWaitlock(job, op):
931     """Marks an opcode as waiting for locks.
932
933     The job's start timestamp is also set if necessary.
934
935     @type job: L{_QueuedJob}
936     @param job: Job object
937     @type op: L{_QueuedOpCode}
938     @param op: Opcode object
939
940     """
941     assert op in job.ops
942     assert op.status in (constants.OP_STATUS_QUEUED,
943                          constants.OP_STATUS_WAITING)
944
945     update = False
946
947     op.result = None
948
949     if op.status == constants.OP_STATUS_QUEUED:
950       op.status = constants.OP_STATUS_WAITING
951       update = True
952
953     if op.start_timestamp is None:
954       op.start_timestamp = TimeStampNow()
955       update = True
956
957     if job.start_timestamp is None:
958       job.start_timestamp = op.start_timestamp
959       update = True
960
961     assert op.status == constants.OP_STATUS_WAITING
962
963     return update
964
965   @staticmethod
966   def _CheckDependencies(queue, job, opctx):
967     """Checks if an opcode has dependencies and if so, processes them.
968
969     @type queue: L{JobQueue}
970     @param queue: Queue object
971     @type job: L{_QueuedJob}
972     @param job: Job object
973     @type opctx: L{_OpExecContext}
974     @param opctx: Opcode execution context
975     @rtype: bool
976     @return: Whether opcode will be re-scheduled by dependency tracker
977
978     """
979     op = opctx.op
980
981     result = False
982
983     while opctx.jobdeps:
984       (dep_job_id, dep_status) = opctx.jobdeps[0]
985
986       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
987                                                           dep_status)
988       assert ht.TNonEmptyString(depmsg), "No dependency message"
989
990       logging.info("%s: %s", opctx.log_prefix, depmsg)
991
992       if depresult == _JobDependencyManager.CONTINUE:
993         # Remove dependency and continue
994         opctx.jobdeps.pop(0)
995
996       elif depresult == _JobDependencyManager.WAIT:
997         # Need to wait for notification, dependency tracker will re-add job
998         # to workerpool
999         result = True
1000         break
1001
1002       elif depresult == _JobDependencyManager.CANCEL:
1003         # Job was cancelled, cancel this job as well
1004         job.Cancel()
1005         assert op.status == constants.OP_STATUS_CANCELING
1006         break
1007
1008       elif depresult in (_JobDependencyManager.WRONGSTATUS,
1009                          _JobDependencyManager.ERROR):
1010         # Job failed or there was an error, this job must fail
1011         op.status = constants.OP_STATUS_ERROR
1012         op.result = _EncodeOpError(errors.OpExecError(depmsg))
1013         break
1014
1015       else:
1016         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1017                                      depresult)
1018
1019     return result
1020
1021   def _ExecOpCodeUnlocked(self, opctx):
1022     """Processes one opcode and returns the result.
1023
1024     """
1025     op = opctx.op
1026
1027     assert op.status == constants.OP_STATUS_WAITING
1028
1029     timeout = opctx.GetNextLockTimeout()
1030
1031     try:
1032       # Make sure not to hold queue lock while calling ExecOpCode
1033       result = self.opexec_fn(op.input,
1034                               _OpExecCallbacks(self.queue, self.job, op),
1035                               timeout=timeout, priority=op.priority)
1036     except mcpu.LockAcquireTimeout:
1037       assert timeout is not None, "Received timeout for blocking acquire"
1038       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1039
1040       assert op.status in (constants.OP_STATUS_WAITING,
1041                            constants.OP_STATUS_CANCELING)
1042
1043       # Was job cancelled while we were waiting for the lock?
1044       if op.status == constants.OP_STATUS_CANCELING:
1045         return (constants.OP_STATUS_CANCELING, None)
1046
1047       # Stay in waitlock while trying to re-acquire lock
1048       return (constants.OP_STATUS_WAITING, None)
1049     except CancelJob:
1050       logging.exception("%s: Canceling job", opctx.log_prefix)
1051       assert op.status == constants.OP_STATUS_CANCELING
1052       return (constants.OP_STATUS_CANCELING, None)
1053     except Exception, err: # pylint: disable=W0703
1054       logging.exception("%s: Caught exception in %s",
1055                         opctx.log_prefix, opctx.summary)
1056       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1057     else:
1058       logging.debug("%s: %s successful",
1059                     opctx.log_prefix, opctx.summary)
1060       return (constants.OP_STATUS_SUCCESS, result)
1061
1062   def __call__(self, _nextop_fn=None):
1063     """Continues execution of a job.
1064
1065     @param _nextop_fn: Callback function for tests
1066     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1067       be deferred and C{WAITDEP} if the dependency manager
1068       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1069
1070     """
1071     queue = self.queue
1072     job = self.job
1073
1074     logging.debug("Processing job %s", job.id)
1075
1076     queue.acquire(shared=1)
1077     try:
1078       opcount = len(job.ops)
1079
1080       assert job.writable, "Expected writable job"
1081
1082       # Don't do anything for finalized jobs
1083       if job.CalcStatus() in constants.JOBS_FINALIZED:
1084         return self.FINISHED
1085
1086       # Is a previous opcode still pending?
1087       if job.cur_opctx:
1088         opctx = job.cur_opctx
1089         job.cur_opctx = None
1090       else:
1091         if __debug__ and _nextop_fn:
1092           _nextop_fn()
1093         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1094
1095       op = opctx.op
1096
1097       # Consistency check
1098       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1099                                      constants.OP_STATUS_CANCELING)
1100                         for i in job.ops[opctx.index + 1:])
1101
1102       assert op.status in (constants.OP_STATUS_QUEUED,
1103                            constants.OP_STATUS_WAITING,
1104                            constants.OP_STATUS_CANCELING)
1105
1106       assert (op.priority <= constants.OP_PRIO_LOWEST and
1107               op.priority >= constants.OP_PRIO_HIGHEST)
1108
1109       waitjob = None
1110
1111       if op.status != constants.OP_STATUS_CANCELING:
1112         assert op.status in (constants.OP_STATUS_QUEUED,
1113                              constants.OP_STATUS_WAITING)
1114
1115         # Prepare to start opcode
1116         if self._MarkWaitlock(job, op):
1117           # Write to disk
1118           queue.UpdateJobUnlocked(job)
1119
1120         assert op.status == constants.OP_STATUS_WAITING
1121         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1122         assert job.start_timestamp and op.start_timestamp
1123         assert waitjob is None
1124
1125         # Check if waiting for a job is necessary
1126         waitjob = self._CheckDependencies(queue, job, opctx)
1127
1128         assert op.status in (constants.OP_STATUS_WAITING,
1129                              constants.OP_STATUS_CANCELING,
1130                              constants.OP_STATUS_ERROR)
1131
1132         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1133                                          constants.OP_STATUS_ERROR)):
1134           logging.info("%s: opcode %s waiting for locks",
1135                        opctx.log_prefix, opctx.summary)
1136
1137           assert not opctx.jobdeps, "Not all dependencies were removed"
1138
1139           queue.release()
1140           try:
1141             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1142           finally:
1143             queue.acquire(shared=1)
1144
1145           op.status = op_status
1146           op.result = op_result
1147
1148           assert not waitjob
1149
1150         if op.status == constants.OP_STATUS_WAITING:
1151           # Couldn't get locks in time
1152           assert not op.end_timestamp
1153         else:
1154           # Finalize opcode
1155           op.end_timestamp = TimeStampNow()
1156
1157           if op.status == constants.OP_STATUS_CANCELING:
1158             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1159                                   for i in job.ops[opctx.index:])
1160           else:
1161             assert op.status in constants.OPS_FINALIZED
1162
1163       if op.status == constants.OP_STATUS_WAITING or waitjob:
1164         finalize = False
1165
1166         if not waitjob and opctx.CheckPriorityIncrease():
1167           # Priority was changed, need to update on-disk file
1168           queue.UpdateJobUnlocked(job)
1169
1170         # Keep around for another round
1171         job.cur_opctx = opctx
1172
1173         assert (op.priority <= constants.OP_PRIO_LOWEST and
1174                 op.priority >= constants.OP_PRIO_HIGHEST)
1175
1176         # In no case must the status be finalized here
1177         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1178
1179       else:
1180         # Ensure all opcodes so far have been successful
1181         assert (opctx.index == 0 or
1182                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1183                            for i in job.ops[:opctx.index]))
1184
1185         # Reset context
1186         job.cur_opctx = None
1187
1188         if op.status == constants.OP_STATUS_SUCCESS:
1189           finalize = False
1190
1191         elif op.status == constants.OP_STATUS_ERROR:
1192           # Ensure failed opcode has an exception as its result
1193           assert errors.GetEncodedError(job.ops[opctx.index].result)
1194
1195           to_encode = errors.OpExecError("Preceding opcode failed")
1196           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1197                                 _EncodeOpError(to_encode))
1198           finalize = True
1199
1200           # Consistency check
1201           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1202                             errors.GetEncodedError(i.result)
1203                             for i in job.ops[opctx.index:])
1204
1205         elif op.status == constants.OP_STATUS_CANCELING:
1206           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1207                                 "Job canceled by request")
1208           finalize = True
1209
1210         else:
1211           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1212
1213         if opctx.index == (opcount - 1):
1214           # Finalize on last opcode
1215           finalize = True
1216
1217         if finalize:
1218           # All opcodes have been run, finalize job
1219           job.Finalize()
1220
1221         # Write to disk. If the job status is final, this is the final write
1222         # allowed. Once the file has been written, it can be archived anytime.
1223         queue.UpdateJobUnlocked(job)
1224
1225         assert not waitjob
1226
1227         if finalize:
1228           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1229           return self.FINISHED
1230
1231       assert not waitjob or queue.depmgr.JobWaiting(job)
1232
1233       if waitjob:
1234         return self.WAITDEP
1235       else:
1236         return self.DEFER
1237     finally:
1238       assert job.writable, "Job became read-only while being processed"
1239       queue.release()
1240
1241
1242 def _EvaluateJobProcessorResult(depmgr, job, result):
1243   """Looks at a result from L{_JobProcessor} for a job.
1244
1245   To be used in a L{_JobQueueWorker}.
1246
1247   """
1248   if result == _JobProcessor.FINISHED:
1249     # Notify waiting jobs
1250     depmgr.NotifyWaiters(job.id)
1251
1252   elif result == _JobProcessor.DEFER:
1253     # Schedule again
1254     raise workerpool.DeferTask(priority=job.CalcPriority())
1255
1256   elif result == _JobProcessor.WAITDEP:
1257     # No-op, dependency manager will re-schedule
1258     pass
1259
1260   else:
1261     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1262                                  (result, ))
1263
1264
1265 class _JobQueueWorker(workerpool.BaseWorker):
1266   """The actual job workers.
1267
1268   """
1269   def RunTask(self, job): # pylint: disable=W0221
1270     """Job executor.
1271
1272     @type job: L{_QueuedJob}
1273     @param job: the job to be processed
1274
1275     """
1276     assert job.writable, "Expected writable job"
1277
1278     # Ensure only one worker is active on a single job. If a job registers for
1279     # a dependency job, and the other job notifies before the first worker is
1280     # done, the job can end up in the tasklist more than once.
1281     job.processor_lock.acquire()
1282     try:
1283       return self._RunTaskInner(job)
1284     finally:
1285       job.processor_lock.release()
1286
1287   def _RunTaskInner(self, job):
1288     """Executes a job.
1289
1290     Must be called with per-job lock acquired.
1291
1292     """
1293     queue = job.queue
1294     assert queue == self.pool.queue
1295
1296     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1297     setname_fn(None)
1298
1299     proc = mcpu.Processor(queue.context, job.id)
1300
1301     # Create wrapper for setting thread name
1302     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1303                                     proc.ExecOpCode)
1304
1305     _EvaluateJobProcessorResult(queue.depmgr, job,
1306                                 _JobProcessor(queue, wrap_execop_fn, job)())
1307
1308   @staticmethod
1309   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1310     """Updates the worker thread name to include a short summary of the opcode.
1311
1312     @param setname_fn: Callable setting worker thread name
1313     @param execop_fn: Callable for executing opcode (usually
1314                       L{mcpu.Processor.ExecOpCode})
1315
1316     """
1317     setname_fn(op)
1318     try:
1319       return execop_fn(op, *args, **kwargs)
1320     finally:
1321       setname_fn(None)
1322
1323   @staticmethod
1324   def _GetWorkerName(job, op):
1325     """Sets the worker thread name.
1326
1327     @type job: L{_QueuedJob}
1328     @type op: L{opcodes.OpCode}
1329
1330     """
1331     parts = ["Job%s" % job.id]
1332
1333     if op:
1334       parts.append(op.TinySummary())
1335
1336     return "/".join(parts)
1337
1338
1339 class _JobQueueWorkerPool(workerpool.WorkerPool):
1340   """Simple class implementing a job-processing workerpool.
1341
1342   """
1343   def __init__(self, queue):
1344     super(_JobQueueWorkerPool, self).__init__("Jq",
1345                                               JOBQUEUE_THREADS,
1346                                               _JobQueueWorker)
1347     self.queue = queue
1348
1349
1350 class _JobDependencyManager:
1351   """Keeps track of job dependencies.
1352
1353   """
1354   (WAIT,
1355    ERROR,
1356    CANCEL,
1357    CONTINUE,
1358    WRONGSTATUS) = range(1, 6)
1359
1360   def __init__(self, getstatus_fn, enqueue_fn):
1361     """Initializes this class.
1362
1363     """
1364     self._getstatus_fn = getstatus_fn
1365     self._enqueue_fn = enqueue_fn
1366
1367     self._waiters = {}
1368     self._lock = locking.SharedLock("JobDepMgr")
1369
1370   @locking.ssynchronized(_LOCK, shared=1)
1371   def GetLockInfo(self, requested): # pylint: disable=W0613
1372     """Retrieves information about waiting jobs.
1373
1374     @type requested: set
1375     @param requested: Requested information, see C{query.LQ_*}
1376
1377     """
1378     # No need to sort here, that's being done by the lock manager and query
1379     # library. There are no priorities for notifying jobs, hence all show up as
1380     # one item under "pending".
1381     return [("job/%s" % job_id, None, None,
1382              [("job", [job.id for job in waiters])])
1383             for job_id, waiters in self._waiters.items()
1384             if waiters]
1385
1386   @locking.ssynchronized(_LOCK, shared=1)
1387   def JobWaiting(self, job):
1388     """Checks if a job is waiting.
1389
1390     """
1391     return compat.any(job in jobs
1392                       for jobs in self._waiters.values())
1393
1394   @locking.ssynchronized(_LOCK)
1395   def CheckAndRegister(self, job, dep_job_id, dep_status):
1396     """Checks if a dependency job has the requested status.
1397
1398     If the other job is not yet in a finalized status, the calling job will be
1399     notified (re-added to the workerpool) at a later point.
1400
1401     @type job: L{_QueuedJob}
1402     @param job: Job object
1403     @type dep_job_id: int
1404     @param dep_job_id: ID of dependency job
1405     @type dep_status: list
1406     @param dep_status: Required status
1407
1408     """
1409     assert ht.TJobId(job.id)
1410     assert ht.TJobId(dep_job_id)
1411     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1412
1413     if job.id == dep_job_id:
1414       return (self.ERROR, "Job can't depend on itself")
1415
1416     # Get status of dependency job
1417     try:
1418       status = self._getstatus_fn(dep_job_id)
1419     except errors.JobLost, err:
1420       return (self.ERROR, "Dependency error: %s" % err)
1421
1422     assert status in constants.JOB_STATUS_ALL
1423
1424     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1425
1426     if status not in constants.JOBS_FINALIZED:
1427       # Register for notification and wait for job to finish
1428       job_id_waiters.add(job)
1429       return (self.WAIT,
1430               "Need to wait for job %s, wanted status '%s'" %
1431               (dep_job_id, dep_status))
1432
1433     # Remove from waiters list
1434     if job in job_id_waiters:
1435       job_id_waiters.remove(job)
1436
1437     if (status == constants.JOB_STATUS_CANCELED and
1438         constants.JOB_STATUS_CANCELED not in dep_status):
1439       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1440
1441     elif not dep_status or status in dep_status:
1442       return (self.CONTINUE,
1443               "Dependency job %s finished with status '%s'" %
1444               (dep_job_id, status))
1445
1446     else:
1447       return (self.WRONGSTATUS,
1448               "Dependency job %s finished with status '%s',"
1449               " not one of '%s' as required" %
1450               (dep_job_id, status, utils.CommaJoin(dep_status)))
1451
1452   def _RemoveEmptyWaitersUnlocked(self):
1453     """Remove all jobs without actual waiters.
1454
1455     """
1456     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1457                    if not waiters]:
1458       del self._waiters[job_id]
1459
1460   def NotifyWaiters(self, job_id):
1461     """Notifies all jobs waiting for a certain job ID.
1462
1463     @attention: Do not call until L{CheckAndRegister} returned a status other
1464       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1465     @type job_id: int
1466     @param job_id: Job ID
1467
1468     """
1469     assert ht.TJobId(job_id)
1470
1471     self._lock.acquire()
1472     try:
1473       self._RemoveEmptyWaitersUnlocked()
1474
1475       jobs = self._waiters.pop(job_id, None)
1476     finally:
1477       self._lock.release()
1478
1479     if jobs:
1480       # Re-add jobs to workerpool
1481       logging.debug("Re-adding %s jobs which were waiting for job %s",
1482                     len(jobs), job_id)
1483       self._enqueue_fn(jobs)
1484
1485
1486 def _RequireOpenQueue(fn):
1487   """Decorator for "public" functions.
1488
1489   This function should be used for all 'public' functions. That is,
1490   functions usually called from other classes. Note that this should
1491   be applied only to methods (not plain functions), since it expects
1492   that the decorated function is called with a first argument that has
1493   a '_queue_filelock' argument.
1494
1495   @warning: Use this decorator only after locking.ssynchronized
1496
1497   Example::
1498     @locking.ssynchronized(_LOCK)
1499     @_RequireOpenQueue
1500     def Example(self):
1501       pass
1502
1503   """
1504   def wrapper(self, *args, **kwargs):
1505     # pylint: disable=W0212
1506     assert self._queue_filelock is not None, "Queue should be open"
1507     return fn(self, *args, **kwargs)
1508   return wrapper
1509
1510
1511 def _RequireNonDrainedQueue(fn):
1512   """Decorator checking for a non-drained queue.
1513
1514   To be used with functions submitting new jobs.
1515
1516   """
1517   def wrapper(self, *args, **kwargs):
1518     """Wrapper function.
1519
1520     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1521
1522     """
1523     # Ok when sharing the big job queue lock, as the drain file is created when
1524     # the lock is exclusive.
1525     # Needs access to protected member, pylint: disable=W0212
1526     if self._drained:
1527       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1528
1529     if not self._accepting_jobs:
1530       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1531
1532     return fn(self, *args, **kwargs)
1533   return wrapper
1534
1535
1536 class JobQueue(object):
1537   """Queue used to manage the jobs.
1538
1539   """
1540   def __init__(self, context):
1541     """Constructor for JobQueue.
1542
1543     The constructor will initialize the job queue object and then
1544     start loading the current jobs from disk, either for starting them
1545     (if they were queue) or for aborting them (if they were already
1546     running).
1547
1548     @type context: GanetiContext
1549     @param context: the context object for access to the configuration
1550         data and other ganeti objects
1551
1552     """
1553     self.context = context
1554     self._memcache = weakref.WeakValueDictionary()
1555     self._my_hostname = netutils.Hostname.GetSysName()
1556
1557     # The Big JobQueue lock. If a code block or method acquires it in shared
1558     # mode safe it must guarantee concurrency with all the code acquiring it in
1559     # shared mode, including itself. In order not to acquire it at all
1560     # concurrency must be guaranteed with all code acquiring it in shared mode
1561     # and all code acquiring it exclusively.
1562     self._lock = locking.SharedLock("JobQueue")
1563
1564     self.acquire = self._lock.acquire
1565     self.release = self._lock.release
1566
1567     # Accept jobs by default
1568     self._accepting_jobs = True
1569
1570     # Initialize the queue, and acquire the filelock.
1571     # This ensures no other process is working on the job queue.
1572     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1573
1574     # Read serial file
1575     self._last_serial = jstore.ReadSerial()
1576     assert self._last_serial is not None, ("Serial file was modified between"
1577                                            " check in jstore and here")
1578
1579     # Get initial list of nodes
1580     self._nodes = dict((n.name, n.primary_ip)
1581                        for n in self.context.cfg.GetAllNodesInfo().values()
1582                        if n.master_candidate)
1583
1584     # Remove master node
1585     self._nodes.pop(self._my_hostname, None)
1586
1587     # TODO: Check consistency across nodes
1588
1589     self._queue_size = None
1590     self._UpdateQueueSizeUnlocked()
1591     assert ht.TInt(self._queue_size)
1592     self._drained = jstore.CheckDrainFlag()
1593
1594     # Job dependencies
1595     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1596                                         self._EnqueueJobs)
1597     self.context.glm.AddToLockMonitor(self.depmgr)
1598
1599     # Setup worker pool
1600     self._wpool = _JobQueueWorkerPool(self)
1601     try:
1602       self._InspectQueue()
1603     except:
1604       self._wpool.TerminateWorkers()
1605       raise
1606
1607   @locking.ssynchronized(_LOCK)
1608   @_RequireOpenQueue
1609   def _InspectQueue(self):
1610     """Loads the whole job queue and resumes unfinished jobs.
1611
1612     This function needs the lock here because WorkerPool.AddTask() may start a
1613     job while we're still doing our work.
1614
1615     """
1616     logging.info("Inspecting job queue")
1617
1618     restartjobs = []
1619
1620     all_job_ids = self._GetJobIDsUnlocked()
1621     jobs_count = len(all_job_ids)
1622     lastinfo = time.time()
1623     for idx, job_id in enumerate(all_job_ids):
1624       # Give an update every 1000 jobs or 10 seconds
1625       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1626           idx == (jobs_count - 1)):
1627         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1628                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1629         lastinfo = time.time()
1630
1631       job = self._LoadJobUnlocked(job_id)
1632
1633       # a failure in loading the job can cause 'None' to be returned
1634       if job is None:
1635         continue
1636
1637       status = job.CalcStatus()
1638
1639       if status == constants.JOB_STATUS_QUEUED:
1640         restartjobs.append(job)
1641
1642       elif status in (constants.JOB_STATUS_RUNNING,
1643                       constants.JOB_STATUS_WAITING,
1644                       constants.JOB_STATUS_CANCELING):
1645         logging.warning("Unfinished job %s found: %s", job.id, job)
1646
1647         if status == constants.JOB_STATUS_WAITING:
1648           # Restart job
1649           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1650           restartjobs.append(job)
1651         else:
1652           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1653                                 "Unclean master daemon shutdown")
1654           job.Finalize()
1655
1656         self.UpdateJobUnlocked(job)
1657
1658     if restartjobs:
1659       logging.info("Restarting %s jobs", len(restartjobs))
1660       self._EnqueueJobsUnlocked(restartjobs)
1661
1662     logging.info("Job queue inspection finished")
1663
1664   def _GetRpc(self, address_list):
1665     """Gets RPC runner with context.
1666
1667     """
1668     return rpc.JobQueueRunner(self.context, address_list)
1669
1670   @locking.ssynchronized(_LOCK)
1671   @_RequireOpenQueue
1672   def AddNode(self, node):
1673     """Register a new node with the queue.
1674
1675     @type node: L{objects.Node}
1676     @param node: the node object to be added
1677
1678     """
1679     node_name = node.name
1680     assert node_name != self._my_hostname
1681
1682     # Clean queue directory on added node
1683     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1684     msg = result.fail_msg
1685     if msg:
1686       logging.warning("Cannot cleanup queue directory on node %s: %s",
1687                       node_name, msg)
1688
1689     if not node.master_candidate:
1690       # remove if existing, ignoring errors
1691       self._nodes.pop(node_name, None)
1692       # and skip the replication of the job ids
1693       return
1694
1695     # Upload the whole queue excluding archived jobs
1696     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1697
1698     # Upload current serial file
1699     files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1700
1701     # Static address list
1702     addrs = [node.primary_ip]
1703
1704     for file_name in files:
1705       # Read file content
1706       content = utils.ReadFile(file_name)
1707
1708       result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1709                              file_name, content)
1710       msg = result[node_name].fail_msg
1711       if msg:
1712         logging.error("Failed to upload file %s to node %s: %s",
1713                       file_name, node_name, msg)
1714
1715     self._nodes[node_name] = node.primary_ip
1716
1717   @locking.ssynchronized(_LOCK)
1718   @_RequireOpenQueue
1719   def RemoveNode(self, node_name):
1720     """Callback called when removing nodes from the cluster.
1721
1722     @type node_name: str
1723     @param node_name: the name of the node to remove
1724
1725     """
1726     self._nodes.pop(node_name, None)
1727
1728   @staticmethod
1729   def _CheckRpcResult(result, nodes, failmsg):
1730     """Verifies the status of an RPC call.
1731
1732     Since we aim to keep consistency should this node (the current
1733     master) fail, we will log errors if our rpc fail, and especially
1734     log the case when more than half of the nodes fails.
1735
1736     @param result: the data as returned from the rpc call
1737     @type nodes: list
1738     @param nodes: the list of nodes we made the call to
1739     @type failmsg: str
1740     @param failmsg: the identifier to be used for logging
1741
1742     """
1743     failed = []
1744     success = []
1745
1746     for node in nodes:
1747       msg = result[node].fail_msg
1748       if msg:
1749         failed.append(node)
1750         logging.error("RPC call %s (%s) failed on node %s: %s",
1751                       result[node].call, failmsg, node, msg)
1752       else:
1753         success.append(node)
1754
1755     # +1 for the master node
1756     if (len(success) + 1) < len(failed):
1757       # TODO: Handle failing nodes
1758       logging.error("More than half of the nodes failed")
1759
1760   def _GetNodeIp(self):
1761     """Helper for returning the node name/ip list.
1762
1763     @rtype: (list, list)
1764     @return: a tuple of two lists, the first one with the node
1765         names and the second one with the node addresses
1766
1767     """
1768     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1769     name_list = self._nodes.keys()
1770     addr_list = [self._nodes[name] for name in name_list]
1771     return name_list, addr_list
1772
1773   def _UpdateJobQueueFile(self, file_name, data, replicate):
1774     """Writes a file locally and then replicates it to all nodes.
1775
1776     This function will replace the contents of a file on the local
1777     node and then replicate it to all the other nodes we have.
1778
1779     @type file_name: str
1780     @param file_name: the path of the file to be replicated
1781     @type data: str
1782     @param data: the new contents of the file
1783     @type replicate: boolean
1784     @param replicate: whether to spread the changes to the remote nodes
1785
1786     """
1787     getents = runtime.GetEnts()
1788     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1789                     gid=getents.masterd_gid)
1790
1791     if replicate:
1792       names, addrs = self._GetNodeIp()
1793       result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1794       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1795
1796   def _RenameFilesUnlocked(self, rename):
1797     """Renames a file locally and then replicate the change.
1798
1799     This function will rename a file in the local queue directory
1800     and then replicate this rename to all the other nodes we have.
1801
1802     @type rename: list of (old, new)
1803     @param rename: List containing tuples mapping old to new names
1804
1805     """
1806     # Rename them locally
1807     for old, new in rename:
1808       utils.RenameFile(old, new, mkdir=True)
1809
1810     # ... and on all nodes
1811     names, addrs = self._GetNodeIp()
1812     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1813     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1814
1815   def _NewSerialsUnlocked(self, count):
1816     """Generates a new job identifier.
1817
1818     Job identifiers are unique during the lifetime of a cluster.
1819
1820     @type count: integer
1821     @param count: how many serials to return
1822     @rtype: list of int
1823     @return: a list of job identifiers.
1824
1825     """
1826     assert ht.TPositiveInt(count)
1827
1828     # New number
1829     serial = self._last_serial + count
1830
1831     # Write to file
1832     self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1833                              "%s\n" % serial, True)
1834
1835     result = [jstore.FormatJobID(v)
1836               for v in range(self._last_serial + 1, serial + 1)]
1837
1838     # Keep it only if we were able to write the file
1839     self._last_serial = serial
1840
1841     assert len(result) == count
1842
1843     return result
1844
1845   @staticmethod
1846   def _GetJobPath(job_id):
1847     """Returns the job file for a given job id.
1848
1849     @type job_id: str
1850     @param job_id: the job identifier
1851     @rtype: str
1852     @return: the path to the job file
1853
1854     """
1855     return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1856
1857   @staticmethod
1858   def _GetArchivedJobPath(job_id):
1859     """Returns the archived job file for a give job id.
1860
1861     @type job_id: str
1862     @param job_id: the job identifier
1863     @rtype: str
1864     @return: the path to the archived job file
1865
1866     """
1867     return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1868                           jstore.GetArchiveDirectory(job_id),
1869                           "job-%s" % job_id)
1870
1871   @staticmethod
1872   def _DetermineJobDirectories(archived):
1873     result = [pathutils.QUEUE_DIR]
1874
1875     if archived:
1876       archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1877       result.extend(map(compat.partial(utils.PathJoin, archive_path),
1878                         utils.ListVisibleFiles(archive_path)))
1879
1880     return result
1881
1882   @classmethod
1883   def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1884     """Return all known job IDs.
1885
1886     The method only looks at disk because it's a requirement that all
1887     jobs are present on disk (so in the _memcache we don't have any
1888     extra IDs).
1889
1890     @type sort: boolean
1891     @param sort: perform sorting on the returned job ids
1892     @rtype: list
1893     @return: the list of job IDs
1894
1895     """
1896     jlist = []
1897
1898     for path in cls._DetermineJobDirectories(archived):
1899       for filename in utils.ListVisibleFiles(path):
1900         m = constants.JOB_FILE_RE.match(filename)
1901         if m:
1902           jlist.append(int(m.group(1)))
1903
1904     if sort:
1905       jlist.sort()
1906     return jlist
1907
1908   def _LoadJobUnlocked(self, job_id):
1909     """Loads a job from the disk or memory.
1910
1911     Given a job id, this will return the cached job object if
1912     existing, or try to load the job from the disk. If loading from
1913     disk, it will also add the job to the cache.
1914
1915     @type job_id: int
1916     @param job_id: the job id
1917     @rtype: L{_QueuedJob} or None
1918     @return: either None or the job object
1919
1920     """
1921     job = self._memcache.get(job_id, None)
1922     if job:
1923       logging.debug("Found job %s in memcache", job_id)
1924       assert job.writable, "Found read-only job in memcache"
1925       return job
1926
1927     try:
1928       job = self._LoadJobFromDisk(job_id, False)
1929       if job is None:
1930         return job
1931     except errors.JobFileCorrupted:
1932       old_path = self._GetJobPath(job_id)
1933       new_path = self._GetArchivedJobPath(job_id)
1934       if old_path == new_path:
1935         # job already archived (future case)
1936         logging.exception("Can't parse job %s", job_id)
1937       else:
1938         # non-archived case
1939         logging.exception("Can't parse job %s, will archive.", job_id)
1940         self._RenameFilesUnlocked([(old_path, new_path)])
1941       return None
1942
1943     assert job.writable, "Job just loaded is not writable"
1944
1945     self._memcache[job_id] = job
1946     logging.debug("Added job %s to the cache", job_id)
1947     return job
1948
1949   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1950     """Load the given job file from disk.
1951
1952     Given a job file, read, load and restore it in a _QueuedJob format.
1953
1954     @type job_id: int
1955     @param job_id: job identifier
1956     @type try_archived: bool
1957     @param try_archived: Whether to try loading an archived job
1958     @rtype: L{_QueuedJob} or None
1959     @return: either None or the job object
1960
1961     """
1962     path_functions = [(self._GetJobPath, False)]
1963
1964     if try_archived:
1965       path_functions.append((self._GetArchivedJobPath, True))
1966
1967     raw_data = None
1968     archived = None
1969
1970     for (fn, archived) in path_functions:
1971       filepath = fn(job_id)
1972       logging.debug("Loading job from %s", filepath)
1973       try:
1974         raw_data = utils.ReadFile(filepath)
1975       except EnvironmentError, err:
1976         if err.errno != errno.ENOENT:
1977           raise
1978       else:
1979         break
1980
1981     if not raw_data:
1982       return None
1983
1984     if writable is None:
1985       writable = not archived
1986
1987     try:
1988       data = serializer.LoadJson(raw_data)
1989       job = _QueuedJob.Restore(self, data, writable, archived)
1990     except Exception, err: # pylint: disable=W0703
1991       raise errors.JobFileCorrupted(err)
1992
1993     return job
1994
1995   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1996     """Load the given job file from disk.
1997
1998     Given a job file, read, load and restore it in a _QueuedJob format.
1999     In case of error reading the job, it gets returned as None, and the
2000     exception is logged.
2001
2002     @type job_id: int
2003     @param job_id: job identifier
2004     @type try_archived: bool
2005     @param try_archived: Whether to try loading an archived job
2006     @rtype: L{_QueuedJob} or None
2007     @return: either None or the job object
2008
2009     """
2010     try:
2011       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2012     except (errors.JobFileCorrupted, EnvironmentError):
2013       logging.exception("Can't load/parse job %s", job_id)
2014       return None
2015
2016   def _UpdateQueueSizeUnlocked(self):
2017     """Update the queue size.
2018
2019     """
2020     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2021
2022   @locking.ssynchronized(_LOCK)
2023   @_RequireOpenQueue
2024   def SetDrainFlag(self, drain_flag):
2025     """Sets the drain flag for the queue.
2026
2027     @type drain_flag: boolean
2028     @param drain_flag: Whether to set or unset the drain flag
2029
2030     """
2031     jstore.SetDrainFlag(drain_flag)
2032
2033     self._drained = drain_flag
2034
2035     return True
2036
2037   @_RequireOpenQueue
2038   def _SubmitJobUnlocked(self, job_id, ops):
2039     """Create and store a new job.
2040
2041     This enters the job into our job queue and also puts it on the new
2042     queue, in order for it to be picked up by the queue processors.
2043
2044     @type job_id: job ID
2045     @param job_id: the job ID for the new job
2046     @type ops: list
2047     @param ops: The list of OpCodes that will become the new job.
2048     @rtype: L{_QueuedJob}
2049     @return: the job object to be queued
2050     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2051     @raise errors.GenericError: If an opcode is not valid
2052
2053     """
2054     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2055       raise errors.JobQueueFull()
2056
2057     job = _QueuedJob(self, job_id, ops, True)
2058
2059     # Check priority
2060     for idx, op in enumerate(job.ops):
2061       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2062         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2063         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2064                                   " are %s" % (idx, op.priority, allowed))
2065
2066       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2067       if not opcodes.TNoRelativeJobDependencies(dependencies):
2068         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2069                                   " match %s: %s" %
2070                                   (idx, opcodes.TNoRelativeJobDependencies,
2071                                    dependencies))
2072
2073     # Write to disk
2074     self.UpdateJobUnlocked(job)
2075
2076     self._queue_size += 1
2077
2078     logging.debug("Adding new job %s to the cache", job_id)
2079     self._memcache[job_id] = job
2080
2081     return job
2082
2083   @locking.ssynchronized(_LOCK)
2084   @_RequireOpenQueue
2085   @_RequireNonDrainedQueue
2086   def SubmitJob(self, ops):
2087     """Create and store a new job.
2088
2089     @see: L{_SubmitJobUnlocked}
2090
2091     """
2092     (job_id, ) = self._NewSerialsUnlocked(1)
2093     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2094     return job_id
2095
2096   @locking.ssynchronized(_LOCK)
2097   @_RequireOpenQueue
2098   @_RequireNonDrainedQueue
2099   def SubmitManyJobs(self, jobs):
2100     """Create and store multiple jobs.
2101
2102     @see: L{_SubmitJobUnlocked}
2103
2104     """
2105     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2106
2107     (results, added_jobs) = \
2108       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2109
2110     self._EnqueueJobsUnlocked(added_jobs)
2111
2112     return results
2113
2114   @staticmethod
2115   def _FormatSubmitError(msg, ops):
2116     """Formats errors which occurred while submitting a job.
2117
2118     """
2119     return ("%s; opcodes %s" %
2120             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2121
2122   @staticmethod
2123   def _ResolveJobDependencies(resolve_fn, deps):
2124     """Resolves relative job IDs in dependencies.
2125
2126     @type resolve_fn: callable
2127     @param resolve_fn: Function to resolve a relative job ID
2128     @type deps: list
2129     @param deps: Dependencies
2130     @rtype: tuple; (boolean, string or list)
2131     @return: If successful (first tuple item), the returned list contains
2132       resolved job IDs along with the requested status; if not successful,
2133       the second element is an error message
2134
2135     """
2136     result = []
2137
2138     for (dep_job_id, dep_status) in deps:
2139       if ht.TRelativeJobId(dep_job_id):
2140         assert ht.TInt(dep_job_id) and dep_job_id < 0
2141         try:
2142           job_id = resolve_fn(dep_job_id)
2143         except IndexError:
2144           # Abort
2145           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2146       else:
2147         job_id = dep_job_id
2148
2149       result.append((job_id, dep_status))
2150
2151     return (True, result)
2152
2153   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2154     """Create and store multiple jobs.
2155
2156     @see: L{_SubmitJobUnlocked}
2157
2158     """
2159     results = []
2160     added_jobs = []
2161
2162     def resolve_fn(job_idx, reljobid):
2163       assert reljobid < 0
2164       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2165
2166     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2167       for op in ops:
2168         if getattr(op, opcodes.DEPEND_ATTR, None):
2169           (status, data) = \
2170             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2171                                          op.depends)
2172           if not status:
2173             # Abort resolving dependencies
2174             assert ht.TNonEmptyString(data), "No error message"
2175             break
2176           # Use resolved dependencies
2177           op.depends = data
2178       else:
2179         try:
2180           job = self._SubmitJobUnlocked(job_id, ops)
2181         except errors.GenericError, err:
2182           status = False
2183           data = self._FormatSubmitError(str(err), ops)
2184         else:
2185           status = True
2186           data = job_id
2187           added_jobs.append(job)
2188
2189       results.append((status, data))
2190
2191     return (results, added_jobs)
2192
2193   @locking.ssynchronized(_LOCK)
2194   def _EnqueueJobs(self, jobs):
2195     """Helper function to add jobs to worker pool's queue.
2196
2197     @type jobs: list
2198     @param jobs: List of all jobs
2199
2200     """
2201     return self._EnqueueJobsUnlocked(jobs)
2202
2203   def _EnqueueJobsUnlocked(self, jobs):
2204     """Helper function to add jobs to worker pool's queue.
2205
2206     @type jobs: list
2207     @param jobs: List of all jobs
2208
2209     """
2210     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2211     self._wpool.AddManyTasks([(job, ) for job in jobs],
2212                              priority=[job.CalcPriority() for job in jobs])
2213
2214   def _GetJobStatusForDependencies(self, job_id):
2215     """Gets the status of a job for dependencies.
2216
2217     @type job_id: int
2218     @param job_id: Job ID
2219     @raise errors.JobLost: If job can't be found
2220
2221     """
2222     # Not using in-memory cache as doing so would require an exclusive lock
2223
2224     # Try to load from disk
2225     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2226
2227     assert not job.writable, "Got writable job" # pylint: disable=E1101
2228
2229     if job:
2230       return job.CalcStatus()
2231
2232     raise errors.JobLost("Job %s not found" % job_id)
2233
2234   @_RequireOpenQueue
2235   def UpdateJobUnlocked(self, job, replicate=True):
2236     """Update a job's on disk storage.
2237
2238     After a job has been modified, this function needs to be called in
2239     order to write the changes to disk and replicate them to the other
2240     nodes.
2241
2242     @type job: L{_QueuedJob}
2243     @param job: the changed job
2244     @type replicate: boolean
2245     @param replicate: whether to replicate the change to remote nodes
2246
2247     """
2248     if __debug__:
2249       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2250       assert (finalized ^ (job.end_timestamp is None))
2251       assert job.writable, "Can't update read-only job"
2252       assert not job.archived, "Can't update archived job"
2253
2254     filename = self._GetJobPath(job.id)
2255     data = serializer.DumpJson(job.Serialize())
2256     logging.debug("Writing job %s to %s", job.id, filename)
2257     self._UpdateJobQueueFile(filename, data, replicate)
2258
2259   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2260                         timeout):
2261     """Waits for changes in a job.
2262
2263     @type job_id: int
2264     @param job_id: Job identifier
2265     @type fields: list of strings
2266     @param fields: Which fields to check for changes
2267     @type prev_job_info: list or None
2268     @param prev_job_info: Last job information returned
2269     @type prev_log_serial: int
2270     @param prev_log_serial: Last job message serial number
2271     @type timeout: float
2272     @param timeout: maximum time to wait in seconds
2273     @rtype: tuple (job info, log entries)
2274     @return: a tuple of the job information as required via
2275         the fields parameter, and the log entries as a list
2276
2277         if the job has not changed and the timeout has expired,
2278         we instead return a special value,
2279         L{constants.JOB_NOTCHANGED}, which should be interpreted
2280         as such by the clients
2281
2282     """
2283     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2284                              writable=False)
2285
2286     helper = _WaitForJobChangesHelper()
2287
2288     return helper(self._GetJobPath(job_id), load_fn,
2289                   fields, prev_job_info, prev_log_serial, timeout)
2290
2291   @locking.ssynchronized(_LOCK)
2292   @_RequireOpenQueue
2293   def CancelJob(self, job_id):
2294     """Cancels a job.
2295
2296     This will only succeed if the job has not started yet.
2297
2298     @type job_id: int
2299     @param job_id: job ID of job to be cancelled.
2300
2301     """
2302     logging.info("Cancelling job %s", job_id)
2303
2304     job = self._LoadJobUnlocked(job_id)
2305     if not job:
2306       logging.debug("Job %s not found", job_id)
2307       return (False, "Job %s not found" % job_id)
2308
2309     assert job.writable, "Can't cancel read-only job"
2310     assert not job.archived, "Can't cancel archived job"
2311
2312     (success, msg) = job.Cancel()
2313
2314     if success:
2315       # If the job was finalized (e.g. cancelled), this is the final write
2316       # allowed. The job can be archived anytime.
2317       self.UpdateJobUnlocked(job)
2318
2319     return (success, msg)
2320
2321   @_RequireOpenQueue
2322   def _ArchiveJobsUnlocked(self, jobs):
2323     """Archives jobs.
2324
2325     @type jobs: list of L{_QueuedJob}
2326     @param jobs: Job objects
2327     @rtype: int
2328     @return: Number of archived jobs
2329
2330     """
2331     archive_jobs = []
2332     rename_files = []
2333     for job in jobs:
2334       assert job.writable, "Can't archive read-only job"
2335       assert not job.archived, "Can't cancel archived job"
2336
2337       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2338         logging.debug("Job %s is not yet done", job.id)
2339         continue
2340
2341       archive_jobs.append(job)
2342
2343       old = self._GetJobPath(job.id)
2344       new = self._GetArchivedJobPath(job.id)
2345       rename_files.append((old, new))
2346
2347     # TODO: What if 1..n files fail to rename?
2348     self._RenameFilesUnlocked(rename_files)
2349
2350     logging.debug("Successfully archived job(s) %s",
2351                   utils.CommaJoin(job.id for job in archive_jobs))
2352
2353     # Since we haven't quite checked, above, if we succeeded or failed renaming
2354     # the files, we update the cached queue size from the filesystem. When we
2355     # get around to fix the TODO: above, we can use the number of actually
2356     # archived jobs to fix this.
2357     self._UpdateQueueSizeUnlocked()
2358     return len(archive_jobs)
2359
2360   @locking.ssynchronized(_LOCK)
2361   @_RequireOpenQueue
2362   def ArchiveJob(self, job_id):
2363     """Archives a job.
2364
2365     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2366
2367     @type job_id: int
2368     @param job_id: Job ID of job to be archived.
2369     @rtype: bool
2370     @return: Whether job was archived
2371
2372     """
2373     logging.info("Archiving job %s", job_id)
2374
2375     job = self._LoadJobUnlocked(job_id)
2376     if not job:
2377       logging.debug("Job %s not found", job_id)
2378       return False
2379
2380     return self._ArchiveJobsUnlocked([job]) == 1
2381
2382   @locking.ssynchronized(_LOCK)
2383   @_RequireOpenQueue
2384   def AutoArchiveJobs(self, age, timeout):
2385     """Archives all jobs based on age.
2386
2387     The method will archive all jobs which are older than the age
2388     parameter. For jobs that don't have an end timestamp, the start
2389     timestamp will be considered. The special '-1' age will cause
2390     archival of all jobs (that are not running or queued).
2391
2392     @type age: int
2393     @param age: the minimum age in seconds
2394
2395     """
2396     logging.info("Archiving jobs with age more than %s seconds", age)
2397
2398     now = time.time()
2399     end_time = now + timeout
2400     archived_count = 0
2401     last_touched = 0
2402
2403     all_job_ids = self._GetJobIDsUnlocked()
2404     pending = []
2405     for idx, job_id in enumerate(all_job_ids):
2406       last_touched = idx + 1
2407
2408       # Not optimal because jobs could be pending
2409       # TODO: Measure average duration for job archival and take number of
2410       # pending jobs into account.
2411       if time.time() > end_time:
2412         break
2413
2414       # Returns None if the job failed to load
2415       job = self._LoadJobUnlocked(job_id)
2416       if job:
2417         if job.end_timestamp is None:
2418           if job.start_timestamp is None:
2419             job_age = job.received_timestamp
2420           else:
2421             job_age = job.start_timestamp
2422         else:
2423           job_age = job.end_timestamp
2424
2425         if age == -1 or now - job_age[0] > age:
2426           pending.append(job)
2427
2428           # Archive 10 jobs at a time
2429           if len(pending) >= 10:
2430             archived_count += self._ArchiveJobsUnlocked(pending)
2431             pending = []
2432
2433     if pending:
2434       archived_count += self._ArchiveJobsUnlocked(pending)
2435
2436     return (archived_count, len(all_job_ids) - last_touched)
2437
2438   def _Query(self, fields, qfilter):
2439     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2440                        namefield="id")
2441
2442     # Archived jobs are only looked at if the "archived" field is referenced
2443     # either as a requested field or in the filter. By default archived jobs
2444     # are ignored.
2445     include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2446
2447     job_ids = qobj.RequestedNames()
2448
2449     list_all = (job_ids is None)
2450
2451     if list_all:
2452       # Since files are added to/removed from the queue atomically, there's no
2453       # risk of getting the job ids in an inconsistent state.
2454       job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2455
2456     jobs = []
2457
2458     for job_id in job_ids:
2459       job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2460       if job is not None or not list_all:
2461         jobs.append((job_id, job))
2462
2463     return (qobj, jobs, list_all)
2464
2465   def QueryJobs(self, fields, qfilter):
2466     """Returns a list of jobs in queue.
2467
2468     @type fields: sequence
2469     @param fields: List of wanted fields
2470     @type qfilter: None or query2 filter (list)
2471     @param qfilter: Query filter
2472
2473     """
2474     (qobj, ctx, _) = self._Query(fields, qfilter)
2475
2476     return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2477
2478   def OldStyleQueryJobs(self, job_ids, fields):
2479     """Returns a list of jobs in queue.
2480
2481     @type job_ids: list
2482     @param job_ids: sequence of job identifiers or None for all
2483     @type fields: list
2484     @param fields: names of fields to return
2485     @rtype: list
2486     @return: list one element per job, each element being list with
2487         the requested fields
2488
2489     """
2490     # backwards compat:
2491     job_ids = [int(jid) for jid in job_ids]
2492     qfilter = qlang.MakeSimpleFilter("id", job_ids)
2493
2494     (qobj, ctx, _) = self._Query(fields, qfilter)
2495
2496     return qobj.OldStyleQuery(ctx, sort_by_name=False)
2497
2498   @locking.ssynchronized(_LOCK)
2499   def PrepareShutdown(self):
2500     """Prepare to stop the job queue.
2501
2502     Disables execution of jobs in the workerpool and returns whether there are
2503     any jobs currently running. If the latter is the case, the job queue is not
2504     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2505     be called without interfering with any job. Queued and unfinished jobs will
2506     be resumed next time.
2507
2508     Once this function has been called no new job submissions will be accepted
2509     (see L{_RequireNonDrainedQueue}).
2510
2511     @rtype: bool
2512     @return: Whether there are any running jobs
2513
2514     """
2515     if self._accepting_jobs:
2516       self._accepting_jobs = False
2517
2518       # Tell worker pool to stop processing pending tasks
2519       self._wpool.SetActive(False)
2520
2521     return self._wpool.HasRunningTasks()
2522
2523   @locking.ssynchronized(_LOCK)
2524   @_RequireOpenQueue
2525   def Shutdown(self):
2526     """Stops the job queue.
2527
2528     This shutdowns all the worker threads an closes the queue.
2529
2530     """
2531     self._wpool.TerminateWorkers()
2532
2533     self._queue_filelock.Close()
2534     self._queue_filelock = None