Merge branch 'stable-2.6-hotplug' into stable-2.6-ippool-hotplug-esi
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38
39 try:
40   # pylint: disable=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59 from ganeti import ht
60 from ganeti import query
61 from ganeti import qlang
62
63
64 JOBQUEUE_THREADS = 25
65 JOBS_PER_ARCHIVE_DIRECTORY = 10000
66
67 # member lock names to be passed to @ssynchronized decorator
68 _LOCK = "_lock"
69 _QUEUE = "_queue"
70
71
72 class CancelJob(Exception):
73   """Special exception to cancel a job.
74
75   """
76
77
78 class QueueShutdown(Exception):
79   """Special exception to abort a job when the job queue is shutting down.
80
81   """
82
83
84 def TimeStampNow():
85   """Returns the current timestamp.
86
87   @rtype: tuple
88   @return: the current time in the (seconds, microseconds) format
89
90   """
91   return utils.SplitTime(time.time())
92
93
94 class _SimpleJobQuery:
95   """Wrapper for job queries.
96
97   Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
98
99   """
100   def __init__(self, fields):
101     """Initializes this class.
102
103     """
104     self._query = query.Query(query.JOB_FIELDS, fields)
105
106   def __call__(self, job):
107     """Executes a job query using cached field list.
108
109     """
110     return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
111
112
113 class _QueuedOpCode(object):
114   """Encapsulates an opcode object.
115
116   @ivar log: holds the execution log and consists of tuples
117   of the form C{(log_serial, timestamp, level, message)}
118   @ivar input: the OpCode we encapsulate
119   @ivar status: the current status
120   @ivar result: the result of the LU execution
121   @ivar start_timestamp: timestamp for the start of the execution
122   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
123   @ivar stop_timestamp: timestamp for the end of the execution
124
125   """
126   __slots__ = ["input", "status", "result", "log", "priority",
127                "start_timestamp", "exec_timestamp", "end_timestamp",
128                "__weakref__"]
129
130   def __init__(self, op):
131     """Initializes instances of this class.
132
133     @type op: L{opcodes.OpCode}
134     @param op: the opcode we encapsulate
135
136     """
137     self.input = op
138     self.status = constants.OP_STATUS_QUEUED
139     self.result = None
140     self.log = []
141     self.start_timestamp = None
142     self.exec_timestamp = None
143     self.end_timestamp = None
144
145     # Get initial priority (it might change during the lifetime of this opcode)
146     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
147
148   @classmethod
149   def Restore(cls, state):
150     """Restore the _QueuedOpCode from the serialized form.
151
152     @type state: dict
153     @param state: the serialized state
154     @rtype: _QueuedOpCode
155     @return: a new _QueuedOpCode instance
156
157     """
158     obj = _QueuedOpCode.__new__(cls)
159     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
160     obj.status = state["status"]
161     obj.result = state["result"]
162     obj.log = state["log"]
163     obj.start_timestamp = state.get("start_timestamp", None)
164     obj.exec_timestamp = state.get("exec_timestamp", None)
165     obj.end_timestamp = state.get("end_timestamp", None)
166     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
167     return obj
168
169   def Serialize(self):
170     """Serializes this _QueuedOpCode.
171
172     @rtype: dict
173     @return: the dictionary holding the serialized state
174
175     """
176     return {
177       "input": self.input.__getstate__(),
178       "status": self.status,
179       "result": self.result,
180       "log": self.log,
181       "start_timestamp": self.start_timestamp,
182       "exec_timestamp": self.exec_timestamp,
183       "end_timestamp": self.end_timestamp,
184       "priority": self.priority,
185       }
186
187
188 class _QueuedJob(object):
189   """In-memory job representation.
190
191   This is what we use to track the user-submitted jobs. Locking must
192   be taken care of by users of this class.
193
194   @type queue: L{JobQueue}
195   @ivar queue: the parent queue
196   @ivar id: the job ID
197   @type ops: list
198   @ivar ops: the list of _QueuedOpCode that constitute the job
199   @type log_serial: int
200   @ivar log_serial: holds the index for the next log entry
201   @ivar received_timestamp: the timestamp for when the job was received
202   @ivar start_timestmap: the timestamp for start of execution
203   @ivar end_timestamp: the timestamp for end of execution
204   @ivar writable: Whether the job is allowed to be modified
205
206   """
207   # pylint: disable=W0212
208   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
209                "received_timestamp", "start_timestamp", "end_timestamp",
210                "__weakref__", "processor_lock", "writable"]
211
212   def __init__(self, queue, job_id, ops, writable):
213     """Constructor for the _QueuedJob.
214
215     @type queue: L{JobQueue}
216     @param queue: our parent queue
217     @type job_id: job_id
218     @param job_id: our job id
219     @type ops: list
220     @param ops: the list of opcodes we hold, which will be encapsulated
221         in _QueuedOpCodes
222     @type writable: bool
223     @param writable: Whether job can be modified
224
225     """
226     if not ops:
227       raise errors.GenericError("A job needs at least one opcode")
228
229     self.queue = queue
230     self.id = job_id
231     self.ops = [_QueuedOpCode(op) for op in ops]
232     self.log_serial = 0
233     self.received_timestamp = TimeStampNow()
234     self.start_timestamp = None
235     self.end_timestamp = None
236
237     self._InitInMemory(self, writable)
238
239   @staticmethod
240   def _InitInMemory(obj, writable):
241     """Initializes in-memory variables.
242
243     """
244     obj.writable = writable
245     obj.ops_iter = None
246     obj.cur_opctx = None
247
248     # Read-only jobs are not processed and therefore don't need a lock
249     if writable:
250       obj.processor_lock = threading.Lock()
251     else:
252       obj.processor_lock = None
253
254   def __repr__(self):
255     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
256               "id=%s" % self.id,
257               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
258
259     return "<%s at %#x>" % (" ".join(status), id(self))
260
261   @classmethod
262   def Restore(cls, queue, state, writable):
263     """Restore a _QueuedJob from serialized state:
264
265     @type queue: L{JobQueue}
266     @param queue: to which queue the restored job belongs
267     @type state: dict
268     @param state: the serialized state
269     @type writable: bool
270     @param writable: Whether job can be modified
271     @rtype: _JobQueue
272     @return: the restored _JobQueue instance
273
274     """
275     obj = _QueuedJob.__new__(cls)
276     obj.queue = queue
277     obj.id = state["id"]
278     obj.received_timestamp = state.get("received_timestamp", None)
279     obj.start_timestamp = state.get("start_timestamp", None)
280     obj.end_timestamp = state.get("end_timestamp", None)
281
282     obj.ops = []
283     obj.log_serial = 0
284     for op_state in state["ops"]:
285       op = _QueuedOpCode.Restore(op_state)
286       for log_entry in op.log:
287         obj.log_serial = max(obj.log_serial, log_entry[0])
288       obj.ops.append(op)
289
290     cls._InitInMemory(obj, writable)
291
292     return obj
293
294   def Serialize(self):
295     """Serialize the _JobQueue instance.
296
297     @rtype: dict
298     @return: the serialized state
299
300     """
301     return {
302       "id": self.id,
303       "ops": [op.Serialize() for op in self.ops],
304       "start_timestamp": self.start_timestamp,
305       "end_timestamp": self.end_timestamp,
306       "received_timestamp": self.received_timestamp,
307       }
308
309   def CalcStatus(self):
310     """Compute the status of this job.
311
312     This function iterates over all the _QueuedOpCodes in the job and
313     based on their status, computes the job status.
314
315     The algorithm is:
316       - if we find a cancelled, or finished with error, the job
317         status will be the same
318       - otherwise, the last opcode with the status one of:
319           - waitlock
320           - canceling
321           - running
322
323         will determine the job status
324
325       - otherwise, it means either all opcodes are queued, or success,
326         and the job status will be the same
327
328     @return: the job status
329
330     """
331     status = constants.JOB_STATUS_QUEUED
332
333     all_success = True
334     for op in self.ops:
335       if op.status == constants.OP_STATUS_SUCCESS:
336         continue
337
338       all_success = False
339
340       if op.status == constants.OP_STATUS_QUEUED:
341         pass
342       elif op.status == constants.OP_STATUS_WAITING:
343         status = constants.JOB_STATUS_WAITING
344       elif op.status == constants.OP_STATUS_RUNNING:
345         status = constants.JOB_STATUS_RUNNING
346       elif op.status == constants.OP_STATUS_CANCELING:
347         status = constants.JOB_STATUS_CANCELING
348         break
349       elif op.status == constants.OP_STATUS_ERROR:
350         status = constants.JOB_STATUS_ERROR
351         # The whole job fails if one opcode failed
352         break
353       elif op.status == constants.OP_STATUS_CANCELED:
354         status = constants.OP_STATUS_CANCELED
355         break
356
357     if all_success:
358       status = constants.JOB_STATUS_SUCCESS
359
360     return status
361
362   def CalcPriority(self):
363     """Gets the current priority for this job.
364
365     Only unfinished opcodes are considered. When all are done, the default
366     priority is used.
367
368     @rtype: int
369
370     """
371     priorities = [op.priority for op in self.ops
372                   if op.status not in constants.OPS_FINALIZED]
373
374     if not priorities:
375       # All opcodes are done, assume default priority
376       return constants.OP_PRIO_DEFAULT
377
378     return min(priorities)
379
380   def GetLogEntries(self, newer_than):
381     """Selectively returns the log entries.
382
383     @type newer_than: None or int
384     @param newer_than: if this is None, return all log entries,
385         otherwise return only the log entries with serial higher
386         than this value
387     @rtype: list
388     @return: the list of the log entries selected
389
390     """
391     if newer_than is None:
392       serial = -1
393     else:
394       serial = newer_than
395
396     entries = []
397     for op in self.ops:
398       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
399
400     return entries
401
402   def GetInfo(self, fields):
403     """Returns information about a job.
404
405     @type fields: list
406     @param fields: names of fields to return
407     @rtype: list
408     @return: list with one element for each field
409     @raise errors.OpExecError: when an invalid field
410         has been passed
411
412     """
413     return _SimpleJobQuery(fields)(self)
414
415   def MarkUnfinishedOps(self, status, result):
416     """Mark unfinished opcodes with a given status and result.
417
418     This is an utility function for marking all running or waiting to
419     be run opcodes with a given status. Opcodes which are already
420     finalised are not changed.
421
422     @param status: a given opcode status
423     @param result: the opcode result
424
425     """
426     not_marked = True
427     for op in self.ops:
428       if op.status in constants.OPS_FINALIZED:
429         assert not_marked, "Finalized opcodes found after non-finalized ones"
430         continue
431       op.status = status
432       op.result = result
433       not_marked = False
434
435   def Finalize(self):
436     """Marks the job as finalized.
437
438     """
439     self.end_timestamp = TimeStampNow()
440
441   def Cancel(self):
442     """Marks job as canceled/-ing if possible.
443
444     @rtype: tuple; (bool, string)
445     @return: Boolean describing whether job was successfully canceled or marked
446       as canceling and a text message
447
448     """
449     status = self.CalcStatus()
450
451     if status == constants.JOB_STATUS_QUEUED:
452       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
453                              "Job canceled by request")
454       self.Finalize()
455       return (True, "Job %s canceled" % self.id)
456
457     elif status == constants.JOB_STATUS_WAITING:
458       # The worker will notice the new status and cancel the job
459       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
460       return (True, "Job %s will be canceled" % self.id)
461
462     else:
463       logging.debug("Job %s is no longer waiting in the queue", self.id)
464       return (False, "Job %s is no longer waiting in the queue" % self.id)
465
466
467 class _OpExecCallbacks(mcpu.OpExecCbBase):
468   def __init__(self, queue, job, op):
469     """Initializes this class.
470
471     @type queue: L{JobQueue}
472     @param queue: Job queue
473     @type job: L{_QueuedJob}
474     @param job: Job object
475     @type op: L{_QueuedOpCode}
476     @param op: OpCode
477
478     """
479     assert queue, "Queue is missing"
480     assert job, "Job is missing"
481     assert op, "Opcode is missing"
482
483     self._queue = queue
484     self._job = job
485     self._op = op
486
487   def _CheckCancel(self):
488     """Raises an exception to cancel the job if asked to.
489
490     """
491     # Cancel here if we were asked to
492     if self._op.status == constants.OP_STATUS_CANCELING:
493       logging.debug("Canceling opcode")
494       raise CancelJob()
495
496     # See if queue is shutting down
497     if not self._queue.AcceptingJobsUnlocked():
498       logging.debug("Queue is shutting down")
499       raise QueueShutdown()
500
501   @locking.ssynchronized(_QUEUE, shared=1)
502   def NotifyStart(self):
503     """Mark the opcode as running, not lock-waiting.
504
505     This is called from the mcpu code as a notifier function, when the LU is
506     finally about to start the Exec() method. Of course, to have end-user
507     visible results, the opcode must be initially (before calling into
508     Processor.ExecOpCode) set to OP_STATUS_WAITING.
509
510     """
511     assert self._op in self._job.ops
512     assert self._op.status in (constants.OP_STATUS_WAITING,
513                                constants.OP_STATUS_CANCELING)
514
515     # Cancel here if we were asked to
516     self._CheckCancel()
517
518     logging.debug("Opcode is now running")
519
520     self._op.status = constants.OP_STATUS_RUNNING
521     self._op.exec_timestamp = TimeStampNow()
522
523     # And finally replicate the job status
524     self._queue.UpdateJobUnlocked(self._job)
525
526   @locking.ssynchronized(_QUEUE, shared=1)
527   def _AppendFeedback(self, timestamp, log_type, log_msg):
528     """Internal feedback append function, with locks
529
530     """
531     self._job.log_serial += 1
532     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
533     self._queue.UpdateJobUnlocked(self._job, replicate=False)
534
535   def Feedback(self, *args):
536     """Append a log entry.
537
538     """
539     assert len(args) < 3
540
541     if len(args) == 1:
542       log_type = constants.ELOG_MESSAGE
543       log_msg = args[0]
544     else:
545       (log_type, log_msg) = args
546
547     # The time is split to make serialization easier and not lose
548     # precision.
549     timestamp = utils.SplitTime(time.time())
550     self._AppendFeedback(timestamp, log_type, log_msg)
551
552   def CheckCancel(self):
553     """Check whether job has been cancelled.
554
555     """
556     assert self._op.status in (constants.OP_STATUS_WAITING,
557                                constants.OP_STATUS_CANCELING)
558
559     # Cancel here if we were asked to
560     self._CheckCancel()
561
562   def SubmitManyJobs(self, jobs):
563     """Submits jobs for processing.
564
565     See L{JobQueue.SubmitManyJobs}.
566
567     """
568     # Locking is done in job queue
569     return self._queue.SubmitManyJobs(jobs)
570
571
572 class _JobChangesChecker(object):
573   def __init__(self, fields, prev_job_info, prev_log_serial):
574     """Initializes this class.
575
576     @type fields: list of strings
577     @param fields: Fields requested by LUXI client
578     @type prev_job_info: string
579     @param prev_job_info: previous job info, as passed by the LUXI client
580     @type prev_log_serial: string
581     @param prev_log_serial: previous job serial, as passed by the LUXI client
582
583     """
584     self._squery = _SimpleJobQuery(fields)
585     self._prev_job_info = prev_job_info
586     self._prev_log_serial = prev_log_serial
587
588   def __call__(self, job):
589     """Checks whether job has changed.
590
591     @type job: L{_QueuedJob}
592     @param job: Job object
593
594     """
595     assert not job.writable, "Expected read-only job"
596
597     status = job.CalcStatus()
598     job_info = self._squery(job)
599     log_entries = job.GetLogEntries(self._prev_log_serial)
600
601     # Serializing and deserializing data can cause type changes (e.g. from
602     # tuple to list) or precision loss. We're doing it here so that we get
603     # the same modifications as the data received from the client. Without
604     # this, the comparison afterwards might fail without the data being
605     # significantly different.
606     # TODO: we just deserialized from disk, investigate how to make sure that
607     # the job info and log entries are compatible to avoid this further step.
608     # TODO: Doing something like in testutils.py:UnifyValueType might be more
609     # efficient, though floats will be tricky
610     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
611     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
612
613     # Don't even try to wait if the job is no longer running, there will be
614     # no changes.
615     if (status not in (constants.JOB_STATUS_QUEUED,
616                        constants.JOB_STATUS_RUNNING,
617                        constants.JOB_STATUS_WAITING) or
618         job_info != self._prev_job_info or
619         (log_entries and self._prev_log_serial != log_entries[0][0])):
620       logging.debug("Job %s changed", job.id)
621       return (job_info, log_entries)
622
623     return None
624
625
626 class _JobFileChangesWaiter(object):
627   def __init__(self, filename):
628     """Initializes this class.
629
630     @type filename: string
631     @param filename: Path to job file
632     @raises errors.InotifyError: if the notifier cannot be setup
633
634     """
635     self._wm = pyinotify.WatchManager()
636     self._inotify_handler = \
637       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
638     self._notifier = \
639       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
640     try:
641       self._inotify_handler.enable()
642     except Exception:
643       # pyinotify doesn't close file descriptors automatically
644       self._notifier.stop()
645       raise
646
647   def _OnInotify(self, notifier_enabled):
648     """Callback for inotify.
649
650     """
651     if not notifier_enabled:
652       self._inotify_handler.enable()
653
654   def Wait(self, timeout):
655     """Waits for the job file to change.
656
657     @type timeout: float
658     @param timeout: Timeout in seconds
659     @return: Whether there have been events
660
661     """
662     assert timeout >= 0
663     have_events = self._notifier.check_events(timeout * 1000)
664     if have_events:
665       self._notifier.read_events()
666     self._notifier.process_events()
667     return have_events
668
669   def Close(self):
670     """Closes underlying notifier and its file descriptor.
671
672     """
673     self._notifier.stop()
674
675
676 class _JobChangesWaiter(object):
677   def __init__(self, filename):
678     """Initializes this class.
679
680     @type filename: string
681     @param filename: Path to job file
682
683     """
684     self._filewaiter = None
685     self._filename = filename
686
687   def Wait(self, timeout):
688     """Waits for a job to change.
689
690     @type timeout: float
691     @param timeout: Timeout in seconds
692     @return: Whether there have been events
693
694     """
695     if self._filewaiter:
696       return self._filewaiter.Wait(timeout)
697
698     # Lazy setup: Avoid inotify setup cost when job file has already changed.
699     # If this point is reached, return immediately and let caller check the job
700     # file again in case there were changes since the last check. This avoids a
701     # race condition.
702     self._filewaiter = _JobFileChangesWaiter(self._filename)
703
704     return True
705
706   def Close(self):
707     """Closes underlying waiter.
708
709     """
710     if self._filewaiter:
711       self._filewaiter.Close()
712
713
714 class _WaitForJobChangesHelper(object):
715   """Helper class using inotify to wait for changes in a job file.
716
717   This class takes a previous job status and serial, and alerts the client when
718   the current job status has changed.
719
720   """
721   @staticmethod
722   def _CheckForChanges(counter, job_load_fn, check_fn):
723     if counter.next() > 0:
724       # If this isn't the first check the job is given some more time to change
725       # again. This gives better performance for jobs generating many
726       # changes/messages.
727       time.sleep(0.1)
728
729     job = job_load_fn()
730     if not job:
731       raise errors.JobLost()
732
733     result = check_fn(job)
734     if result is None:
735       raise utils.RetryAgain()
736
737     return result
738
739   def __call__(self, filename, job_load_fn,
740                fields, prev_job_info, prev_log_serial, timeout):
741     """Waits for changes on a job.
742
743     @type filename: string
744     @param filename: File on which to wait for changes
745     @type job_load_fn: callable
746     @param job_load_fn: Function to load job
747     @type fields: list of strings
748     @param fields: Which fields to check for changes
749     @type prev_job_info: list or None
750     @param prev_job_info: Last job information returned
751     @type prev_log_serial: int
752     @param prev_log_serial: Last job message serial number
753     @type timeout: float
754     @param timeout: maximum time to wait in seconds
755
756     """
757     counter = itertools.count()
758     try:
759       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
760       waiter = _JobChangesWaiter(filename)
761       try:
762         return utils.Retry(compat.partial(self._CheckForChanges,
763                                           counter, job_load_fn, check_fn),
764                            utils.RETRY_REMAINING_TIME, timeout,
765                            wait_fn=waiter.Wait)
766       finally:
767         waiter.Close()
768     except (errors.InotifyError, errors.JobLost):
769       return None
770     except utils.RetryTimeout:
771       return constants.JOB_NOTCHANGED
772
773
774 def _EncodeOpError(err):
775   """Encodes an error which occurred while processing an opcode.
776
777   """
778   if isinstance(err, errors.GenericError):
779     to_encode = err
780   else:
781     to_encode = errors.OpExecError(str(err))
782
783   return errors.EncodeException(to_encode)
784
785
786 class _TimeoutStrategyWrapper:
787   def __init__(self, fn):
788     """Initializes this class.
789
790     """
791     self._fn = fn
792     self._next = None
793
794   def _Advance(self):
795     """Gets the next timeout if necessary.
796
797     """
798     if self._next is None:
799       self._next = self._fn()
800
801   def Peek(self):
802     """Returns the next timeout.
803
804     """
805     self._Advance()
806     return self._next
807
808   def Next(self):
809     """Returns the current timeout and advances the internal state.
810
811     """
812     self._Advance()
813     result = self._next
814     self._next = None
815     return result
816
817
818 class _OpExecContext:
819   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
820     """Initializes this class.
821
822     """
823     self.op = op
824     self.index = index
825     self.log_prefix = log_prefix
826     self.summary = op.input.Summary()
827
828     # Create local copy to modify
829     if getattr(op.input, opcodes.DEPEND_ATTR, None):
830       self.jobdeps = op.input.depends[:]
831     else:
832       self.jobdeps = None
833
834     self._timeout_strategy_factory = timeout_strategy_factory
835     self._ResetTimeoutStrategy()
836
837   def _ResetTimeoutStrategy(self):
838     """Creates a new timeout strategy.
839
840     """
841     self._timeout_strategy = \
842       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
843
844   def CheckPriorityIncrease(self):
845     """Checks whether priority can and should be increased.
846
847     Called when locks couldn't be acquired.
848
849     """
850     op = self.op
851
852     # Exhausted all retries and next round should not use blocking acquire
853     # for locks?
854     if (self._timeout_strategy.Peek() is None and
855         op.priority > constants.OP_PRIO_HIGHEST):
856       logging.debug("Increasing priority")
857       op.priority -= 1
858       self._ResetTimeoutStrategy()
859       return True
860
861     return False
862
863   def GetNextLockTimeout(self):
864     """Returns the next lock acquire timeout.
865
866     """
867     return self._timeout_strategy.Next()
868
869
870 class _JobProcessor(object):
871   (DEFER,
872    WAITDEP,
873    FINISHED) = range(1, 4)
874
875   def __init__(self, queue, opexec_fn, job,
876                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
877     """Initializes this class.
878
879     """
880     self.queue = queue
881     self.opexec_fn = opexec_fn
882     self.job = job
883     self._timeout_strategy_factory = _timeout_strategy_factory
884
885   @staticmethod
886   def _FindNextOpcode(job, timeout_strategy_factory):
887     """Locates the next opcode to run.
888
889     @type job: L{_QueuedJob}
890     @param job: Job object
891     @param timeout_strategy_factory: Callable to create new timeout strategy
892
893     """
894     # Create some sort of a cache to speed up locating next opcode for future
895     # lookups
896     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
897     # pending and one for processed ops.
898     if job.ops_iter is None:
899       job.ops_iter = enumerate(job.ops)
900
901     # Find next opcode to run
902     while True:
903       try:
904         (idx, op) = job.ops_iter.next()
905       except StopIteration:
906         raise errors.ProgrammerError("Called for a finished job")
907
908       if op.status == constants.OP_STATUS_RUNNING:
909         # Found an opcode already marked as running
910         raise errors.ProgrammerError("Called for job marked as running")
911
912       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
913                              timeout_strategy_factory)
914
915       if op.status not in constants.OPS_FINALIZED:
916         return opctx
917
918       # This is a job that was partially completed before master daemon
919       # shutdown, so it can be expected that some opcodes are already
920       # completed successfully (if any did error out, then the whole job
921       # should have been aborted and not resubmitted for processing).
922       logging.info("%s: opcode %s already processed, skipping",
923                    opctx.log_prefix, opctx.summary)
924
925   @staticmethod
926   def _MarkWaitlock(job, op):
927     """Marks an opcode as waiting for locks.
928
929     The job's start timestamp is also set if necessary.
930
931     @type job: L{_QueuedJob}
932     @param job: Job object
933     @type op: L{_QueuedOpCode}
934     @param op: Opcode object
935
936     """
937     assert op in job.ops
938     assert op.status in (constants.OP_STATUS_QUEUED,
939                          constants.OP_STATUS_WAITING)
940
941     update = False
942
943     op.result = None
944
945     if op.status == constants.OP_STATUS_QUEUED:
946       op.status = constants.OP_STATUS_WAITING
947       update = True
948
949     if op.start_timestamp is None:
950       op.start_timestamp = TimeStampNow()
951       update = True
952
953     if job.start_timestamp is None:
954       job.start_timestamp = op.start_timestamp
955       update = True
956
957     assert op.status == constants.OP_STATUS_WAITING
958
959     return update
960
961   @staticmethod
962   def _CheckDependencies(queue, job, opctx):
963     """Checks if an opcode has dependencies and if so, processes them.
964
965     @type queue: L{JobQueue}
966     @param queue: Queue object
967     @type job: L{_QueuedJob}
968     @param job: Job object
969     @type opctx: L{_OpExecContext}
970     @param opctx: Opcode execution context
971     @rtype: bool
972     @return: Whether opcode will be re-scheduled by dependency tracker
973
974     """
975     op = opctx.op
976
977     result = False
978
979     while opctx.jobdeps:
980       (dep_job_id, dep_status) = opctx.jobdeps[0]
981
982       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
983                                                           dep_status)
984       assert ht.TNonEmptyString(depmsg), "No dependency message"
985
986       logging.info("%s: %s", opctx.log_prefix, depmsg)
987
988       if depresult == _JobDependencyManager.CONTINUE:
989         # Remove dependency and continue
990         opctx.jobdeps.pop(0)
991
992       elif depresult == _JobDependencyManager.WAIT:
993         # Need to wait for notification, dependency tracker will re-add job
994         # to workerpool
995         result = True
996         break
997
998       elif depresult == _JobDependencyManager.CANCEL:
999         # Job was cancelled, cancel this job as well
1000         job.Cancel()
1001         assert op.status == constants.OP_STATUS_CANCELING
1002         break
1003
1004       elif depresult in (_JobDependencyManager.WRONGSTATUS,
1005                          _JobDependencyManager.ERROR):
1006         # Job failed or there was an error, this job must fail
1007         op.status = constants.OP_STATUS_ERROR
1008         op.result = _EncodeOpError(errors.OpExecError(depmsg))
1009         break
1010
1011       else:
1012         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1013                                      depresult)
1014
1015     return result
1016
1017   def _ExecOpCodeUnlocked(self, opctx):
1018     """Processes one opcode and returns the result.
1019
1020     """
1021     op = opctx.op
1022
1023     assert op.status == constants.OP_STATUS_WAITING
1024
1025     timeout = opctx.GetNextLockTimeout()
1026
1027     try:
1028       # Make sure not to hold queue lock while calling ExecOpCode
1029       result = self.opexec_fn(op.input,
1030                               _OpExecCallbacks(self.queue, self.job, op),
1031                               timeout=timeout, priority=op.priority)
1032     except mcpu.LockAcquireTimeout:
1033       assert timeout is not None, "Received timeout for blocking acquire"
1034       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1035
1036       assert op.status in (constants.OP_STATUS_WAITING,
1037                            constants.OP_STATUS_CANCELING)
1038
1039       # Was job cancelled while we were waiting for the lock?
1040       if op.status == constants.OP_STATUS_CANCELING:
1041         return (constants.OP_STATUS_CANCELING, None)
1042
1043       # Queue is shutting down, return to queued
1044       if not self.queue.AcceptingJobsUnlocked():
1045         return (constants.OP_STATUS_QUEUED, None)
1046
1047       # Stay in waitlock while trying to re-acquire lock
1048       return (constants.OP_STATUS_WAITING, None)
1049     except CancelJob:
1050       logging.exception("%s: Canceling job", opctx.log_prefix)
1051       assert op.status == constants.OP_STATUS_CANCELING
1052       return (constants.OP_STATUS_CANCELING, None)
1053
1054     except QueueShutdown:
1055       logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1056
1057       assert op.status == constants.OP_STATUS_WAITING
1058
1059       # Job hadn't been started yet, so it should return to the queue
1060       return (constants.OP_STATUS_QUEUED, None)
1061
1062     except Exception, err: # pylint: disable=W0703
1063       logging.exception("%s: Caught exception in %s",
1064                         opctx.log_prefix, opctx.summary)
1065       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1066     else:
1067       logging.debug("%s: %s successful",
1068                     opctx.log_prefix, opctx.summary)
1069       return (constants.OP_STATUS_SUCCESS, result)
1070
1071   def __call__(self, _nextop_fn=None):
1072     """Continues execution of a job.
1073
1074     @param _nextop_fn: Callback function for tests
1075     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1076       be deferred and C{WAITDEP} if the dependency manager
1077       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1078
1079     """
1080     queue = self.queue
1081     job = self.job
1082
1083     logging.debug("Processing job %s", job.id)
1084
1085     queue.acquire(shared=1)
1086     try:
1087       opcount = len(job.ops)
1088
1089       assert job.writable, "Expected writable job"
1090
1091       # Don't do anything for finalized jobs
1092       if job.CalcStatus() in constants.JOBS_FINALIZED:
1093         return self.FINISHED
1094
1095       # Is a previous opcode still pending?
1096       if job.cur_opctx:
1097         opctx = job.cur_opctx
1098         job.cur_opctx = None
1099       else:
1100         if __debug__ and _nextop_fn:
1101           _nextop_fn()
1102         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1103
1104       op = opctx.op
1105
1106       # Consistency check
1107       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1108                                      constants.OP_STATUS_CANCELING)
1109                         for i in job.ops[opctx.index + 1:])
1110
1111       assert op.status in (constants.OP_STATUS_QUEUED,
1112                            constants.OP_STATUS_WAITING,
1113                            constants.OP_STATUS_CANCELING)
1114
1115       assert (op.priority <= constants.OP_PRIO_LOWEST and
1116               op.priority >= constants.OP_PRIO_HIGHEST)
1117
1118       waitjob = None
1119
1120       if op.status != constants.OP_STATUS_CANCELING:
1121         assert op.status in (constants.OP_STATUS_QUEUED,
1122                              constants.OP_STATUS_WAITING)
1123
1124         # Prepare to start opcode
1125         if self._MarkWaitlock(job, op):
1126           # Write to disk
1127           queue.UpdateJobUnlocked(job)
1128
1129         assert op.status == constants.OP_STATUS_WAITING
1130         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1131         assert job.start_timestamp and op.start_timestamp
1132         assert waitjob is None
1133
1134         # Check if waiting for a job is necessary
1135         waitjob = self._CheckDependencies(queue, job, opctx)
1136
1137         assert op.status in (constants.OP_STATUS_WAITING,
1138                              constants.OP_STATUS_CANCELING,
1139                              constants.OP_STATUS_ERROR)
1140
1141         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1142                                          constants.OP_STATUS_ERROR)):
1143           logging.info("%s: opcode %s waiting for locks",
1144                        opctx.log_prefix, opctx.summary)
1145
1146           assert not opctx.jobdeps, "Not all dependencies were removed"
1147
1148           queue.release()
1149           try:
1150             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1151           finally:
1152             queue.acquire(shared=1)
1153
1154           op.status = op_status
1155           op.result = op_result
1156
1157           assert not waitjob
1158
1159         if op.status in (constants.OP_STATUS_WAITING,
1160                          constants.OP_STATUS_QUEUED):
1161           # waiting: Couldn't get locks in time
1162           # queued: Queue is shutting down
1163           assert not op.end_timestamp
1164         else:
1165           # Finalize opcode
1166           op.end_timestamp = TimeStampNow()
1167
1168           if op.status == constants.OP_STATUS_CANCELING:
1169             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1170                                   for i in job.ops[opctx.index:])
1171           else:
1172             assert op.status in constants.OPS_FINALIZED
1173
1174       if op.status == constants.OP_STATUS_QUEUED:
1175         # Queue is shutting down
1176         assert not waitjob
1177
1178         finalize = False
1179
1180         # Reset context
1181         job.cur_opctx = None
1182
1183         # In no case must the status be finalized here
1184         assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1185
1186       elif op.status == constants.OP_STATUS_WAITING or waitjob:
1187         finalize = False
1188
1189         if not waitjob and opctx.CheckPriorityIncrease():
1190           # Priority was changed, need to update on-disk file
1191           queue.UpdateJobUnlocked(job)
1192
1193         # Keep around for another round
1194         job.cur_opctx = opctx
1195
1196         assert (op.priority <= constants.OP_PRIO_LOWEST and
1197                 op.priority >= constants.OP_PRIO_HIGHEST)
1198
1199         # In no case must the status be finalized here
1200         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1201
1202       else:
1203         # Ensure all opcodes so far have been successful
1204         assert (opctx.index == 0 or
1205                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1206                            for i in job.ops[:opctx.index]))
1207
1208         # Reset context
1209         job.cur_opctx = None
1210
1211         if op.status == constants.OP_STATUS_SUCCESS:
1212           finalize = False
1213
1214         elif op.status == constants.OP_STATUS_ERROR:
1215           # Ensure failed opcode has an exception as its result
1216           assert errors.GetEncodedError(job.ops[opctx.index].result)
1217
1218           to_encode = errors.OpExecError("Preceding opcode failed")
1219           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1220                                 _EncodeOpError(to_encode))
1221           finalize = True
1222
1223           # Consistency check
1224           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1225                             errors.GetEncodedError(i.result)
1226                             for i in job.ops[opctx.index:])
1227
1228         elif op.status == constants.OP_STATUS_CANCELING:
1229           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1230                                 "Job canceled by request")
1231           finalize = True
1232
1233         else:
1234           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1235
1236         if opctx.index == (opcount - 1):
1237           # Finalize on last opcode
1238           finalize = True
1239
1240         if finalize:
1241           # All opcodes have been run, finalize job
1242           job.Finalize()
1243
1244         # Write to disk. If the job status is final, this is the final write
1245         # allowed. Once the file has been written, it can be archived anytime.
1246         queue.UpdateJobUnlocked(job)
1247
1248         assert not waitjob
1249
1250         if finalize:
1251           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1252           return self.FINISHED
1253
1254       assert not waitjob or queue.depmgr.JobWaiting(job)
1255
1256       if waitjob:
1257         return self.WAITDEP
1258       else:
1259         return self.DEFER
1260     finally:
1261       assert job.writable, "Job became read-only while being processed"
1262       queue.release()
1263
1264
1265 def _EvaluateJobProcessorResult(depmgr, job, result):
1266   """Looks at a result from L{_JobProcessor} for a job.
1267
1268   To be used in a L{_JobQueueWorker}.
1269
1270   """
1271   if result == _JobProcessor.FINISHED:
1272     # Notify waiting jobs
1273     depmgr.NotifyWaiters(job.id)
1274
1275   elif result == _JobProcessor.DEFER:
1276     # Schedule again
1277     raise workerpool.DeferTask(priority=job.CalcPriority())
1278
1279   elif result == _JobProcessor.WAITDEP:
1280     # No-op, dependency manager will re-schedule
1281     pass
1282
1283   else:
1284     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1285                                  (result, ))
1286
1287
1288 class _JobQueueWorker(workerpool.BaseWorker):
1289   """The actual job workers.
1290
1291   """
1292   def RunTask(self, job): # pylint: disable=W0221
1293     """Job executor.
1294
1295     @type job: L{_QueuedJob}
1296     @param job: the job to be processed
1297
1298     """
1299     assert job.writable, "Expected writable job"
1300
1301     # Ensure only one worker is active on a single job. If a job registers for
1302     # a dependency job, and the other job notifies before the first worker is
1303     # done, the job can end up in the tasklist more than once.
1304     job.processor_lock.acquire()
1305     try:
1306       return self._RunTaskInner(job)
1307     finally:
1308       job.processor_lock.release()
1309
1310   def _RunTaskInner(self, job):
1311     """Executes a job.
1312
1313     Must be called with per-job lock acquired.
1314
1315     """
1316     queue = job.queue
1317     assert queue == self.pool.queue
1318
1319     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1320     setname_fn(None)
1321
1322     proc = mcpu.Processor(queue.context, job.id)
1323
1324     # Create wrapper for setting thread name
1325     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1326                                     proc.ExecOpCode)
1327
1328     _EvaluateJobProcessorResult(queue.depmgr, job,
1329                                 _JobProcessor(queue, wrap_execop_fn, job)())
1330
1331   @staticmethod
1332   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1333     """Updates the worker thread name to include a short summary of the opcode.
1334
1335     @param setname_fn: Callable setting worker thread name
1336     @param execop_fn: Callable for executing opcode (usually
1337                       L{mcpu.Processor.ExecOpCode})
1338
1339     """
1340     setname_fn(op)
1341     try:
1342       return execop_fn(op, *args, **kwargs)
1343     finally:
1344       setname_fn(None)
1345
1346   @staticmethod
1347   def _GetWorkerName(job, op):
1348     """Sets the worker thread name.
1349
1350     @type job: L{_QueuedJob}
1351     @type op: L{opcodes.OpCode}
1352
1353     """
1354     parts = ["Job%s" % job.id]
1355
1356     if op:
1357       parts.append(op.TinySummary())
1358
1359     return "/".join(parts)
1360
1361
1362 class _JobQueueWorkerPool(workerpool.WorkerPool):
1363   """Simple class implementing a job-processing workerpool.
1364
1365   """
1366   def __init__(self, queue):
1367     super(_JobQueueWorkerPool, self).__init__("Jq",
1368                                               JOBQUEUE_THREADS,
1369                                               _JobQueueWorker)
1370     self.queue = queue
1371
1372
1373 class _JobDependencyManager:
1374   """Keeps track of job dependencies.
1375
1376   """
1377   (WAIT,
1378    ERROR,
1379    CANCEL,
1380    CONTINUE,
1381    WRONGSTATUS) = range(1, 6)
1382
1383   def __init__(self, getstatus_fn, enqueue_fn):
1384     """Initializes this class.
1385
1386     """
1387     self._getstatus_fn = getstatus_fn
1388     self._enqueue_fn = enqueue_fn
1389
1390     self._waiters = {}
1391     self._lock = locking.SharedLock("JobDepMgr")
1392
1393   @locking.ssynchronized(_LOCK, shared=1)
1394   def GetLockInfo(self, requested): # pylint: disable=W0613
1395     """Retrieves information about waiting jobs.
1396
1397     @type requested: set
1398     @param requested: Requested information, see C{query.LQ_*}
1399
1400     """
1401     # No need to sort here, that's being done by the lock manager and query
1402     # library. There are no priorities for notifying jobs, hence all show up as
1403     # one item under "pending".
1404     return [("job/%s" % job_id, None, None,
1405              [("job", [job.id for job in waiters])])
1406             for job_id, waiters in self._waiters.items()
1407             if waiters]
1408
1409   @locking.ssynchronized(_LOCK, shared=1)
1410   def JobWaiting(self, job):
1411     """Checks if a job is waiting.
1412
1413     """
1414     return compat.any(job in jobs
1415                       for jobs in self._waiters.values())
1416
1417   @locking.ssynchronized(_LOCK)
1418   def CheckAndRegister(self, job, dep_job_id, dep_status):
1419     """Checks if a dependency job has the requested status.
1420
1421     If the other job is not yet in a finalized status, the calling job will be
1422     notified (re-added to the workerpool) at a later point.
1423
1424     @type job: L{_QueuedJob}
1425     @param job: Job object
1426     @type dep_job_id: string
1427     @param dep_job_id: ID of dependency job
1428     @type dep_status: list
1429     @param dep_status: Required status
1430
1431     """
1432     assert ht.TString(job.id)
1433     assert ht.TString(dep_job_id)
1434     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1435
1436     if job.id == dep_job_id:
1437       return (self.ERROR, "Job can't depend on itself")
1438
1439     # Get status of dependency job
1440     try:
1441       status = self._getstatus_fn(dep_job_id)
1442     except errors.JobLost, err:
1443       return (self.ERROR, "Dependency error: %s" % err)
1444
1445     assert status in constants.JOB_STATUS_ALL
1446
1447     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1448
1449     if status not in constants.JOBS_FINALIZED:
1450       # Register for notification and wait for job to finish
1451       job_id_waiters.add(job)
1452       return (self.WAIT,
1453               "Need to wait for job %s, wanted status '%s'" %
1454               (dep_job_id, dep_status))
1455
1456     # Remove from waiters list
1457     if job in job_id_waiters:
1458       job_id_waiters.remove(job)
1459
1460     if (status == constants.JOB_STATUS_CANCELED and
1461         constants.JOB_STATUS_CANCELED not in dep_status):
1462       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1463
1464     elif not dep_status or status in dep_status:
1465       return (self.CONTINUE,
1466               "Dependency job %s finished with status '%s'" %
1467               (dep_job_id, status))
1468
1469     else:
1470       return (self.WRONGSTATUS,
1471               "Dependency job %s finished with status '%s',"
1472               " not one of '%s' as required" %
1473               (dep_job_id, status, utils.CommaJoin(dep_status)))
1474
1475   def _RemoveEmptyWaitersUnlocked(self):
1476     """Remove all jobs without actual waiters.
1477
1478     """
1479     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1480                    if not waiters]:
1481       del self._waiters[job_id]
1482
1483   def NotifyWaiters(self, job_id):
1484     """Notifies all jobs waiting for a certain job ID.
1485
1486     @attention: Do not call until L{CheckAndRegister} returned a status other
1487       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1488     @type job_id: string
1489     @param job_id: Job ID
1490
1491     """
1492     assert ht.TString(job_id)
1493
1494     self._lock.acquire()
1495     try:
1496       self._RemoveEmptyWaitersUnlocked()
1497
1498       jobs = self._waiters.pop(job_id, None)
1499     finally:
1500       self._lock.release()
1501
1502     if jobs:
1503       # Re-add jobs to workerpool
1504       logging.debug("Re-adding %s jobs which were waiting for job %s",
1505                     len(jobs), job_id)
1506       self._enqueue_fn(jobs)
1507
1508
1509 def _RequireOpenQueue(fn):
1510   """Decorator for "public" functions.
1511
1512   This function should be used for all 'public' functions. That is,
1513   functions usually called from other classes. Note that this should
1514   be applied only to methods (not plain functions), since it expects
1515   that the decorated function is called with a first argument that has
1516   a '_queue_filelock' argument.
1517
1518   @warning: Use this decorator only after locking.ssynchronized
1519
1520   Example::
1521     @locking.ssynchronized(_LOCK)
1522     @_RequireOpenQueue
1523     def Example(self):
1524       pass
1525
1526   """
1527   def wrapper(self, *args, **kwargs):
1528     # pylint: disable=W0212
1529     assert self._queue_filelock is not None, "Queue should be open"
1530     return fn(self, *args, **kwargs)
1531   return wrapper
1532
1533
1534 def _RequireNonDrainedQueue(fn):
1535   """Decorator checking for a non-drained queue.
1536
1537   To be used with functions submitting new jobs.
1538
1539   """
1540   def wrapper(self, *args, **kwargs):
1541     """Wrapper function.
1542
1543     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1544
1545     """
1546     # Ok when sharing the big job queue lock, as the drain file is created when
1547     # the lock is exclusive.
1548     # Needs access to protected member, pylint: disable=W0212
1549     if self._drained:
1550       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1551
1552     if not self._accepting_jobs:
1553       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1554
1555     return fn(self, *args, **kwargs)
1556   return wrapper
1557
1558
1559 class JobQueue(object):
1560   """Queue used to manage the jobs.
1561
1562   """
1563   def __init__(self, context):
1564     """Constructor for JobQueue.
1565
1566     The constructor will initialize the job queue object and then
1567     start loading the current jobs from disk, either for starting them
1568     (if they were queue) or for aborting them (if they were already
1569     running).
1570
1571     @type context: GanetiContext
1572     @param context: the context object for access to the configuration
1573         data and other ganeti objects
1574
1575     """
1576     self.context = context
1577     self._memcache = weakref.WeakValueDictionary()
1578     self._my_hostname = netutils.Hostname.GetSysName()
1579
1580     # The Big JobQueue lock. If a code block or method acquires it in shared
1581     # mode safe it must guarantee concurrency with all the code acquiring it in
1582     # shared mode, including itself. In order not to acquire it at all
1583     # concurrency must be guaranteed with all code acquiring it in shared mode
1584     # and all code acquiring it exclusively.
1585     self._lock = locking.SharedLock("JobQueue")
1586
1587     self.acquire = self._lock.acquire
1588     self.release = self._lock.release
1589
1590     # Accept jobs by default
1591     self._accepting_jobs = True
1592
1593     # Initialize the queue, and acquire the filelock.
1594     # This ensures no other process is working on the job queue.
1595     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1596
1597     # Read serial file
1598     self._last_serial = jstore.ReadSerial()
1599     assert self._last_serial is not None, ("Serial file was modified between"
1600                                            " check in jstore and here")
1601
1602     # Get initial list of nodes
1603     self._nodes = dict((n.name, n.primary_ip)
1604                        for n in self.context.cfg.GetAllNodesInfo().values()
1605                        if n.master_candidate)
1606
1607     # Remove master node
1608     self._nodes.pop(self._my_hostname, None)
1609
1610     # TODO: Check consistency across nodes
1611
1612     self._queue_size = None
1613     self._UpdateQueueSizeUnlocked()
1614     assert ht.TInt(self._queue_size)
1615     self._drained = jstore.CheckDrainFlag()
1616
1617     # Job dependencies
1618     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1619                                         self._EnqueueJobs)
1620     self.context.glm.AddToLockMonitor(self.depmgr)
1621
1622     # Setup worker pool
1623     self._wpool = _JobQueueWorkerPool(self)
1624     try:
1625       self._InspectQueue()
1626     except:
1627       self._wpool.TerminateWorkers()
1628       raise
1629
1630   @locking.ssynchronized(_LOCK)
1631   @_RequireOpenQueue
1632   def _InspectQueue(self):
1633     """Loads the whole job queue and resumes unfinished jobs.
1634
1635     This function needs the lock here because WorkerPool.AddTask() may start a
1636     job while we're still doing our work.
1637
1638     """
1639     logging.info("Inspecting job queue")
1640
1641     restartjobs = []
1642
1643     all_job_ids = self._GetJobIDsUnlocked()
1644     jobs_count = len(all_job_ids)
1645     lastinfo = time.time()
1646     for idx, job_id in enumerate(all_job_ids):
1647       # Give an update every 1000 jobs or 10 seconds
1648       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1649           idx == (jobs_count - 1)):
1650         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1651                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1652         lastinfo = time.time()
1653
1654       job = self._LoadJobUnlocked(job_id)
1655
1656       # a failure in loading the job can cause 'None' to be returned
1657       if job is None:
1658         continue
1659
1660       status = job.CalcStatus()
1661
1662       if status == constants.JOB_STATUS_QUEUED:
1663         restartjobs.append(job)
1664
1665       elif status in (constants.JOB_STATUS_RUNNING,
1666                       constants.JOB_STATUS_WAITING,
1667                       constants.JOB_STATUS_CANCELING):
1668         logging.warning("Unfinished job %s found: %s", job.id, job)
1669
1670         if status == constants.JOB_STATUS_WAITING:
1671           # Restart job
1672           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1673           restartjobs.append(job)
1674         else:
1675           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1676                                 "Unclean master daemon shutdown")
1677           job.Finalize()
1678
1679         self.UpdateJobUnlocked(job)
1680
1681     if restartjobs:
1682       logging.info("Restarting %s jobs", len(restartjobs))
1683       self._EnqueueJobsUnlocked(restartjobs)
1684
1685     logging.info("Job queue inspection finished")
1686
1687   def _GetRpc(self, address_list):
1688     """Gets RPC runner with context.
1689
1690     """
1691     return rpc.JobQueueRunner(self.context, address_list)
1692
1693   @locking.ssynchronized(_LOCK)
1694   @_RequireOpenQueue
1695   def AddNode(self, node):
1696     """Register a new node with the queue.
1697
1698     @type node: L{objects.Node}
1699     @param node: the node object to be added
1700
1701     """
1702     node_name = node.name
1703     assert node_name != self._my_hostname
1704
1705     # Clean queue directory on added node
1706     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1707     msg = result.fail_msg
1708     if msg:
1709       logging.warning("Cannot cleanup queue directory on node %s: %s",
1710                       node_name, msg)
1711
1712     if not node.master_candidate:
1713       # remove if existing, ignoring errors
1714       self._nodes.pop(node_name, None)
1715       # and skip the replication of the job ids
1716       return
1717
1718     # Upload the whole queue excluding archived jobs
1719     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1720
1721     # Upload current serial file
1722     files.append(constants.JOB_QUEUE_SERIAL_FILE)
1723
1724     # Static address list
1725     addrs = [node.primary_ip]
1726
1727     for file_name in files:
1728       # Read file content
1729       content = utils.ReadFile(file_name)
1730
1731       result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1732                                                         content)
1733       msg = result[node_name].fail_msg
1734       if msg:
1735         logging.error("Failed to upload file %s to node %s: %s",
1736                       file_name, node_name, msg)
1737
1738     self._nodes[node_name] = node.primary_ip
1739
1740   @locking.ssynchronized(_LOCK)
1741   @_RequireOpenQueue
1742   def RemoveNode(self, node_name):
1743     """Callback called when removing nodes from the cluster.
1744
1745     @type node_name: str
1746     @param node_name: the name of the node to remove
1747
1748     """
1749     self._nodes.pop(node_name, None)
1750
1751   @staticmethod
1752   def _CheckRpcResult(result, nodes, failmsg):
1753     """Verifies the status of an RPC call.
1754
1755     Since we aim to keep consistency should this node (the current
1756     master) fail, we will log errors if our rpc fail, and especially
1757     log the case when more than half of the nodes fails.
1758
1759     @param result: the data as returned from the rpc call
1760     @type nodes: list
1761     @param nodes: the list of nodes we made the call to
1762     @type failmsg: str
1763     @param failmsg: the identifier to be used for logging
1764
1765     """
1766     failed = []
1767     success = []
1768
1769     for node in nodes:
1770       msg = result[node].fail_msg
1771       if msg:
1772         failed.append(node)
1773         logging.error("RPC call %s (%s) failed on node %s: %s",
1774                       result[node].call, failmsg, node, msg)
1775       else:
1776         success.append(node)
1777
1778     # +1 for the master node
1779     if (len(success) + 1) < len(failed):
1780       # TODO: Handle failing nodes
1781       logging.error("More than half of the nodes failed")
1782
1783   def _GetNodeIp(self):
1784     """Helper for returning the node name/ip list.
1785
1786     @rtype: (list, list)
1787     @return: a tuple of two lists, the first one with the node
1788         names and the second one with the node addresses
1789
1790     """
1791     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1792     name_list = self._nodes.keys()
1793     addr_list = [self._nodes[name] for name in name_list]
1794     return name_list, addr_list
1795
1796   def _UpdateJobQueueFile(self, file_name, data, replicate):
1797     """Writes a file locally and then replicates it to all nodes.
1798
1799     This function will replace the contents of a file on the local
1800     node and then replicate it to all the other nodes we have.
1801
1802     @type file_name: str
1803     @param file_name: the path of the file to be replicated
1804     @type data: str
1805     @param data: the new contents of the file
1806     @type replicate: boolean
1807     @param replicate: whether to spread the changes to the remote nodes
1808
1809     """
1810     getents = runtime.GetEnts()
1811     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1812                     gid=getents.masterd_gid)
1813
1814     if replicate:
1815       names, addrs = self._GetNodeIp()
1816       result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1817       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1818
1819   def _RenameFilesUnlocked(self, rename):
1820     """Renames a file locally and then replicate the change.
1821
1822     This function will rename a file in the local queue directory
1823     and then replicate this rename to all the other nodes we have.
1824
1825     @type rename: list of (old, new)
1826     @param rename: List containing tuples mapping old to new names
1827
1828     """
1829     # Rename them locally
1830     for old, new in rename:
1831       utils.RenameFile(old, new, mkdir=True)
1832
1833     # ... and on all nodes
1834     names, addrs = self._GetNodeIp()
1835     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1836     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1837
1838   @staticmethod
1839   def _FormatJobID(job_id):
1840     """Convert a job ID to string format.
1841
1842     Currently this just does C{str(job_id)} after performing some
1843     checks, but if we want to change the job id format this will
1844     abstract this change.
1845
1846     @type job_id: int or long
1847     @param job_id: the numeric job id
1848     @rtype: str
1849     @return: the formatted job id
1850
1851     """
1852     if not isinstance(job_id, (int, long)):
1853       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1854     if job_id < 0:
1855       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1856
1857     return str(job_id)
1858
1859   @classmethod
1860   def _GetArchiveDirectory(cls, job_id):
1861     """Returns the archive directory for a job.
1862
1863     @type job_id: str
1864     @param job_id: Job identifier
1865     @rtype: str
1866     @return: Directory name
1867
1868     """
1869     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1870
1871   def _NewSerialsUnlocked(self, count):
1872     """Generates a new job identifier.
1873
1874     Job identifiers are unique during the lifetime of a cluster.
1875
1876     @type count: integer
1877     @param count: how many serials to return
1878     @rtype: str
1879     @return: a string representing the job identifier.
1880
1881     """
1882     assert ht.TPositiveInt(count)
1883
1884     # New number
1885     serial = self._last_serial + count
1886
1887     # Write to file
1888     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1889                              "%s\n" % serial, True)
1890
1891     result = [self._FormatJobID(v)
1892               for v in range(self._last_serial + 1, serial + 1)]
1893
1894     # Keep it only if we were able to write the file
1895     self._last_serial = serial
1896
1897     assert len(result) == count
1898
1899     return result
1900
1901   @staticmethod
1902   def _GetJobPath(job_id):
1903     """Returns the job file for a given job id.
1904
1905     @type job_id: str
1906     @param job_id: the job identifier
1907     @rtype: str
1908     @return: the path to the job file
1909
1910     """
1911     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1912
1913   @classmethod
1914   def _GetArchivedJobPath(cls, job_id):
1915     """Returns the archived job file for a give job id.
1916
1917     @type job_id: str
1918     @param job_id: the job identifier
1919     @rtype: str
1920     @return: the path to the archived job file
1921
1922     """
1923     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1924                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1925
1926   @staticmethod
1927   def _GetJobIDsUnlocked(sort=True):
1928     """Return all known job IDs.
1929
1930     The method only looks at disk because it's a requirement that all
1931     jobs are present on disk (so in the _memcache we don't have any
1932     extra IDs).
1933
1934     @type sort: boolean
1935     @param sort: perform sorting on the returned job ids
1936     @rtype: list
1937     @return: the list of job IDs
1938
1939     """
1940     jlist = []
1941     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1942       m = constants.JOB_FILE_RE.match(filename)
1943       if m:
1944         jlist.append(m.group(1))
1945     if sort:
1946       jlist = utils.NiceSort(jlist)
1947     return jlist
1948
1949   def _LoadJobUnlocked(self, job_id):
1950     """Loads a job from the disk or memory.
1951
1952     Given a job id, this will return the cached job object if
1953     existing, or try to load the job from the disk. If loading from
1954     disk, it will also add the job to the cache.
1955
1956     @param job_id: the job id
1957     @rtype: L{_QueuedJob} or None
1958     @return: either None or the job object
1959
1960     """
1961     job = self._memcache.get(job_id, None)
1962     if job:
1963       logging.debug("Found job %s in memcache", job_id)
1964       assert job.writable, "Found read-only job in memcache"
1965       return job
1966
1967     try:
1968       job = self._LoadJobFromDisk(job_id, False)
1969       if job is None:
1970         return job
1971     except errors.JobFileCorrupted:
1972       old_path = self._GetJobPath(job_id)
1973       new_path = self._GetArchivedJobPath(job_id)
1974       if old_path == new_path:
1975         # job already archived (future case)
1976         logging.exception("Can't parse job %s", job_id)
1977       else:
1978         # non-archived case
1979         logging.exception("Can't parse job %s, will archive.", job_id)
1980         self._RenameFilesUnlocked([(old_path, new_path)])
1981       return None
1982
1983     assert job.writable, "Job just loaded is not writable"
1984
1985     self._memcache[job_id] = job
1986     logging.debug("Added job %s to the cache", job_id)
1987     return job
1988
1989   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1990     """Load the given job file from disk.
1991
1992     Given a job file, read, load and restore it in a _QueuedJob format.
1993
1994     @type job_id: string
1995     @param job_id: job identifier
1996     @type try_archived: bool
1997     @param try_archived: Whether to try loading an archived job
1998     @rtype: L{_QueuedJob} or None
1999     @return: either None or the job object
2000
2001     """
2002     path_functions = [(self._GetJobPath, True)]
2003
2004     if try_archived:
2005       path_functions.append((self._GetArchivedJobPath, False))
2006
2007     raw_data = None
2008     writable_default = None
2009
2010     for (fn, writable_default) in path_functions:
2011       filepath = fn(job_id)
2012       logging.debug("Loading job from %s", filepath)
2013       try:
2014         raw_data = utils.ReadFile(filepath)
2015       except EnvironmentError, err:
2016         if err.errno != errno.ENOENT:
2017           raise
2018       else:
2019         break
2020
2021     if not raw_data:
2022       return None
2023
2024     if writable is None:
2025       writable = writable_default
2026
2027     try:
2028       data = serializer.LoadJson(raw_data)
2029       job = _QueuedJob.Restore(self, data, writable)
2030     except Exception, err: # pylint: disable=W0703
2031       raise errors.JobFileCorrupted(err)
2032
2033     return job
2034
2035   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2036     """Load the given job file from disk.
2037
2038     Given a job file, read, load and restore it in a _QueuedJob format.
2039     In case of error reading the job, it gets returned as None, and the
2040     exception is logged.
2041
2042     @type job_id: string
2043     @param job_id: job identifier
2044     @type try_archived: bool
2045     @param try_archived: Whether to try loading an archived job
2046     @rtype: L{_QueuedJob} or None
2047     @return: either None or the job object
2048
2049     """
2050     try:
2051       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2052     except (errors.JobFileCorrupted, EnvironmentError):
2053       logging.exception("Can't load/parse job %s", job_id)
2054       return None
2055
2056   def _UpdateQueueSizeUnlocked(self):
2057     """Update the queue size.
2058
2059     """
2060     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2061
2062   @locking.ssynchronized(_LOCK)
2063   @_RequireOpenQueue
2064   def SetDrainFlag(self, drain_flag):
2065     """Sets the drain flag for the queue.
2066
2067     @type drain_flag: boolean
2068     @param drain_flag: Whether to set or unset the drain flag
2069
2070     """
2071     jstore.SetDrainFlag(drain_flag)
2072
2073     self._drained = drain_flag
2074
2075     return True
2076
2077   @_RequireOpenQueue
2078   def _SubmitJobUnlocked(self, job_id, ops):
2079     """Create and store a new job.
2080
2081     This enters the job into our job queue and also puts it on the new
2082     queue, in order for it to be picked up by the queue processors.
2083
2084     @type job_id: job ID
2085     @param job_id: the job ID for the new job
2086     @type ops: list
2087     @param ops: The list of OpCodes that will become the new job.
2088     @rtype: L{_QueuedJob}
2089     @return: the job object to be queued
2090     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2091     @raise errors.GenericError: If an opcode is not valid
2092
2093     """
2094     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2095       raise errors.JobQueueFull()
2096
2097     job = _QueuedJob(self, job_id, ops, True)
2098
2099     # Check priority
2100     for idx, op in enumerate(job.ops):
2101       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2102         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2103         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2104                                   " are %s" % (idx, op.priority, allowed))
2105
2106       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2107       if not opcodes.TNoRelativeJobDependencies(dependencies):
2108         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2109                                   " match %s: %s" %
2110                                   (idx, opcodes.TNoRelativeJobDependencies,
2111                                    dependencies))
2112
2113     # Write to disk
2114     self.UpdateJobUnlocked(job)
2115
2116     self._queue_size += 1
2117
2118     logging.debug("Adding new job %s to the cache", job_id)
2119     self._memcache[job_id] = job
2120
2121     return job
2122
2123   @locking.ssynchronized(_LOCK)
2124   @_RequireOpenQueue
2125   @_RequireNonDrainedQueue
2126   def SubmitJob(self, ops):
2127     """Create and store a new job.
2128
2129     @see: L{_SubmitJobUnlocked}
2130
2131     """
2132     (job_id, ) = self._NewSerialsUnlocked(1)
2133     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2134     return job_id
2135
2136   @locking.ssynchronized(_LOCK)
2137   @_RequireOpenQueue
2138   @_RequireNonDrainedQueue
2139   def SubmitManyJobs(self, jobs):
2140     """Create and store multiple jobs.
2141
2142     @see: L{_SubmitJobUnlocked}
2143
2144     """
2145     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2146
2147     (results, added_jobs) = \
2148       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2149
2150     self._EnqueueJobsUnlocked(added_jobs)
2151
2152     return results
2153
2154   @staticmethod
2155   def _FormatSubmitError(msg, ops):
2156     """Formats errors which occurred while submitting a job.
2157
2158     """
2159     return ("%s; opcodes %s" %
2160             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2161
2162   @staticmethod
2163   def _ResolveJobDependencies(resolve_fn, deps):
2164     """Resolves relative job IDs in dependencies.
2165
2166     @type resolve_fn: callable
2167     @param resolve_fn: Function to resolve a relative job ID
2168     @type deps: list
2169     @param deps: Dependencies
2170     @rtype: list
2171     @return: Resolved dependencies
2172
2173     """
2174     result = []
2175
2176     for (dep_job_id, dep_status) in deps:
2177       if ht.TRelativeJobId(dep_job_id):
2178         assert ht.TInt(dep_job_id) and dep_job_id < 0
2179         try:
2180           job_id = resolve_fn(dep_job_id)
2181         except IndexError:
2182           # Abort
2183           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2184       else:
2185         job_id = dep_job_id
2186
2187       result.append((job_id, dep_status))
2188
2189     return (True, result)
2190
2191   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2192     """Create and store multiple jobs.
2193
2194     @see: L{_SubmitJobUnlocked}
2195
2196     """
2197     results = []
2198     added_jobs = []
2199
2200     def resolve_fn(job_idx, reljobid):
2201       assert reljobid < 0
2202       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2203
2204     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2205       for op in ops:
2206         if getattr(op, opcodes.DEPEND_ATTR, None):
2207           (status, data) = \
2208             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2209                                          op.depends)
2210           if not status:
2211             # Abort resolving dependencies
2212             assert ht.TNonEmptyString(data), "No error message"
2213             break
2214           # Use resolved dependencies
2215           op.depends = data
2216       else:
2217         try:
2218           job = self._SubmitJobUnlocked(job_id, ops)
2219         except errors.GenericError, err:
2220           status = False
2221           data = self._FormatSubmitError(str(err), ops)
2222         else:
2223           status = True
2224           data = job_id
2225           added_jobs.append(job)
2226
2227       results.append((status, data))
2228
2229     return (results, added_jobs)
2230
2231   @locking.ssynchronized(_LOCK)
2232   def _EnqueueJobs(self, jobs):
2233     """Helper function to add jobs to worker pool's queue.
2234
2235     @type jobs: list
2236     @param jobs: List of all jobs
2237
2238     """
2239     return self._EnqueueJobsUnlocked(jobs)
2240
2241   def _EnqueueJobsUnlocked(self, jobs):
2242     """Helper function to add jobs to worker pool's queue.
2243
2244     @type jobs: list
2245     @param jobs: List of all jobs
2246
2247     """
2248     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2249     self._wpool.AddManyTasks([(job, ) for job in jobs],
2250                              priority=[job.CalcPriority() for job in jobs])
2251
2252   def _GetJobStatusForDependencies(self, job_id):
2253     """Gets the status of a job for dependencies.
2254
2255     @type job_id: string
2256     @param job_id: Job ID
2257     @raise errors.JobLost: If job can't be found
2258
2259     """
2260     if not isinstance(job_id, basestring):
2261       job_id = self._FormatJobID(job_id)
2262
2263     # Not using in-memory cache as doing so would require an exclusive lock
2264
2265     # Try to load from disk
2266     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2267
2268     assert not job.writable, "Got writable job" # pylint: disable=E1101
2269
2270     if job:
2271       return job.CalcStatus()
2272
2273     raise errors.JobLost("Job %s not found" % job_id)
2274
2275   @_RequireOpenQueue
2276   def UpdateJobUnlocked(self, job, replicate=True):
2277     """Update a job's on disk storage.
2278
2279     After a job has been modified, this function needs to be called in
2280     order to write the changes to disk and replicate them to the other
2281     nodes.
2282
2283     @type job: L{_QueuedJob}
2284     @param job: the changed job
2285     @type replicate: boolean
2286     @param replicate: whether to replicate the change to remote nodes
2287
2288     """
2289     if __debug__:
2290       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2291       assert (finalized ^ (job.end_timestamp is None))
2292       assert job.writable, "Can't update read-only job"
2293
2294     filename = self._GetJobPath(job.id)
2295     data = serializer.DumpJson(job.Serialize())
2296     logging.debug("Writing job %s to %s", job.id, filename)
2297     self._UpdateJobQueueFile(filename, data, replicate)
2298
2299   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2300                         timeout):
2301     """Waits for changes in a job.
2302
2303     @type job_id: string
2304     @param job_id: Job identifier
2305     @type fields: list of strings
2306     @param fields: Which fields to check for changes
2307     @type prev_job_info: list or None
2308     @param prev_job_info: Last job information returned
2309     @type prev_log_serial: int
2310     @param prev_log_serial: Last job message serial number
2311     @type timeout: float
2312     @param timeout: maximum time to wait in seconds
2313     @rtype: tuple (job info, log entries)
2314     @return: a tuple of the job information as required via
2315         the fields parameter, and the log entries as a list
2316
2317         if the job has not changed and the timeout has expired,
2318         we instead return a special value,
2319         L{constants.JOB_NOTCHANGED}, which should be interpreted
2320         as such by the clients
2321
2322     """
2323     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2324                              writable=False)
2325
2326     helper = _WaitForJobChangesHelper()
2327
2328     return helper(self._GetJobPath(job_id), load_fn,
2329                   fields, prev_job_info, prev_log_serial, timeout)
2330
2331   @locking.ssynchronized(_LOCK)
2332   @_RequireOpenQueue
2333   def CancelJob(self, job_id):
2334     """Cancels a job.
2335
2336     This will only succeed if the job has not started yet.
2337
2338     @type job_id: string
2339     @param job_id: job ID of job to be cancelled.
2340
2341     """
2342     logging.info("Cancelling job %s", job_id)
2343
2344     job = self._LoadJobUnlocked(job_id)
2345     if not job:
2346       logging.debug("Job %s not found", job_id)
2347       return (False, "Job %s not found" % job_id)
2348
2349     assert job.writable, "Can't cancel read-only job"
2350
2351     (success, msg) = job.Cancel()
2352
2353     if success:
2354       # If the job was finalized (e.g. cancelled), this is the final write
2355       # allowed. The job can be archived anytime.
2356       self.UpdateJobUnlocked(job)
2357
2358     return (success, msg)
2359
2360   @_RequireOpenQueue
2361   def _ArchiveJobsUnlocked(self, jobs):
2362     """Archives jobs.
2363
2364     @type jobs: list of L{_QueuedJob}
2365     @param jobs: Job objects
2366     @rtype: int
2367     @return: Number of archived jobs
2368
2369     """
2370     archive_jobs = []
2371     rename_files = []
2372     for job in jobs:
2373       assert job.writable, "Can't archive read-only job"
2374
2375       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2376         logging.debug("Job %s is not yet done", job.id)
2377         continue
2378
2379       archive_jobs.append(job)
2380
2381       old = self._GetJobPath(job.id)
2382       new = self._GetArchivedJobPath(job.id)
2383       rename_files.append((old, new))
2384
2385     # TODO: What if 1..n files fail to rename?
2386     self._RenameFilesUnlocked(rename_files)
2387
2388     logging.debug("Successfully archived job(s) %s",
2389                   utils.CommaJoin(job.id for job in archive_jobs))
2390
2391     # Since we haven't quite checked, above, if we succeeded or failed renaming
2392     # the files, we update the cached queue size from the filesystem. When we
2393     # get around to fix the TODO: above, we can use the number of actually
2394     # archived jobs to fix this.
2395     self._UpdateQueueSizeUnlocked()
2396     return len(archive_jobs)
2397
2398   @locking.ssynchronized(_LOCK)
2399   @_RequireOpenQueue
2400   def ArchiveJob(self, job_id):
2401     """Archives a job.
2402
2403     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2404
2405     @type job_id: string
2406     @param job_id: Job ID of job to be archived.
2407     @rtype: bool
2408     @return: Whether job was archived
2409
2410     """
2411     logging.info("Archiving job %s", job_id)
2412
2413     job = self._LoadJobUnlocked(job_id)
2414     if not job:
2415       logging.debug("Job %s not found", job_id)
2416       return False
2417
2418     return self._ArchiveJobsUnlocked([job]) == 1
2419
2420   @locking.ssynchronized(_LOCK)
2421   @_RequireOpenQueue
2422   def AutoArchiveJobs(self, age, timeout):
2423     """Archives all jobs based on age.
2424
2425     The method will archive all jobs which are older than the age
2426     parameter. For jobs that don't have an end timestamp, the start
2427     timestamp will be considered. The special '-1' age will cause
2428     archival of all jobs (that are not running or queued).
2429
2430     @type age: int
2431     @param age: the minimum age in seconds
2432
2433     """
2434     logging.info("Archiving jobs with age more than %s seconds", age)
2435
2436     now = time.time()
2437     end_time = now + timeout
2438     archived_count = 0
2439     last_touched = 0
2440
2441     all_job_ids = self._GetJobIDsUnlocked()
2442     pending = []
2443     for idx, job_id in enumerate(all_job_ids):
2444       last_touched = idx + 1
2445
2446       # Not optimal because jobs could be pending
2447       # TODO: Measure average duration for job archival and take number of
2448       # pending jobs into account.
2449       if time.time() > end_time:
2450         break
2451
2452       # Returns None if the job failed to load
2453       job = self._LoadJobUnlocked(job_id)
2454       if job:
2455         if job.end_timestamp is None:
2456           if job.start_timestamp is None:
2457             job_age = job.received_timestamp
2458           else:
2459             job_age = job.start_timestamp
2460         else:
2461           job_age = job.end_timestamp
2462
2463         if age == -1 or now - job_age[0] > age:
2464           pending.append(job)
2465
2466           # Archive 10 jobs at a time
2467           if len(pending) >= 10:
2468             archived_count += self._ArchiveJobsUnlocked(pending)
2469             pending = []
2470
2471     if pending:
2472       archived_count += self._ArchiveJobsUnlocked(pending)
2473
2474     return (archived_count, len(all_job_ids) - last_touched)
2475
2476   def _Query(self, fields, qfilter):
2477     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2478                        namefield="id")
2479
2480     job_ids = qobj.RequestedNames()
2481
2482     list_all = (job_ids is None)
2483
2484     if list_all:
2485       # Since files are added to/removed from the queue atomically, there's no
2486       # risk of getting the job ids in an inconsistent state.
2487       job_ids = self._GetJobIDsUnlocked()
2488
2489     jobs = []
2490
2491     for job_id in job_ids:
2492       job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2493       if job is not None or not list_all:
2494         jobs.append((job_id, job))
2495
2496     return (qobj, jobs, list_all)
2497
2498   def QueryJobs(self, fields, qfilter):
2499     """Returns a list of jobs in queue.
2500
2501     @type fields: sequence
2502     @param fields: List of wanted fields
2503     @type qfilter: None or query2 filter (list)
2504     @param qfilter: Query filter
2505
2506     """
2507     (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2508
2509     return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
2510
2511   def OldStyleQueryJobs(self, job_ids, fields):
2512     """Returns a list of jobs in queue.
2513
2514     @type job_ids: list
2515     @param job_ids: sequence of job identifiers or None for all
2516     @type fields: list
2517     @param fields: names of fields to return
2518     @rtype: list
2519     @return: list one element per job, each element being list with
2520         the requested fields
2521
2522     """
2523     qfilter = qlang.MakeSimpleFilter("id", job_ids)
2524
2525     (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2526
2527     return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
2528
2529   @locking.ssynchronized(_LOCK)
2530   def PrepareShutdown(self):
2531     """Prepare to stop the job queue.
2532
2533     Disables execution of jobs in the workerpool and returns whether there are
2534     any jobs currently running. If the latter is the case, the job queue is not
2535     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2536     be called without interfering with any job. Queued and unfinished jobs will
2537     be resumed next time.
2538
2539     Once this function has been called no new job submissions will be accepted
2540     (see L{_RequireNonDrainedQueue}).
2541
2542     @rtype: bool
2543     @return: Whether there are any running jobs
2544
2545     """
2546     if self._accepting_jobs:
2547       self._accepting_jobs = False
2548
2549       # Tell worker pool to stop processing pending tasks
2550       self._wpool.SetActive(False)
2551
2552     return self._wpool.HasRunningTasks()
2553
2554   def AcceptingJobsUnlocked(self):
2555     """Returns whether jobs are accepted.
2556
2557     Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2558     queue is shutting down.
2559
2560     @rtype: bool
2561
2562     """
2563     return self._accepting_jobs
2564
2565   @locking.ssynchronized(_LOCK)
2566   @_RequireOpenQueue
2567   def Shutdown(self):
2568     """Stops the job queue.
2569
2570     This shutdowns all the worker threads an closes the queue.
2571
2572     """
2573     self._wpool.TerminateWorkers()
2574
2575     self._queue_filelock.Close()
2576     self._queue_filelock = None