Always_failover doesn't require --allow-failover anymore
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38
39 try:
40   # pylint: disable=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59 from ganeti import ht
60 from ganeti import query
61 from ganeti import qlang
62
63
64 JOBQUEUE_THREADS = 25
65 JOBS_PER_ARCHIVE_DIRECTORY = 10000
66
67 # member lock names to be passed to @ssynchronized decorator
68 _LOCK = "_lock"
69 _QUEUE = "_queue"
70
71
72 class CancelJob(Exception):
73   """Special exception to cancel a job.
74
75   """
76
77
78 def TimeStampNow():
79   """Returns the current timestamp.
80
81   @rtype: tuple
82   @return: the current time in the (seconds, microseconds) format
83
84   """
85   return utils.SplitTime(time.time())
86
87
88 class _SimpleJobQuery:
89   """Wrapper for job queries.
90
91   Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
92
93   """
94   def __init__(self, fields):
95     """Initializes this class.
96
97     """
98     self._query = query.Query(query.JOB_FIELDS, fields)
99
100   def __call__(self, job):
101     """Executes a job query using cached field list.
102
103     """
104     return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
105
106
107 class _QueuedOpCode(object):
108   """Encapsulates an opcode object.
109
110   @ivar log: holds the execution log and consists of tuples
111   of the form C{(log_serial, timestamp, level, message)}
112   @ivar input: the OpCode we encapsulate
113   @ivar status: the current status
114   @ivar result: the result of the LU execution
115   @ivar start_timestamp: timestamp for the start of the execution
116   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
117   @ivar stop_timestamp: timestamp for the end of the execution
118
119   """
120   __slots__ = ["input", "status", "result", "log", "priority",
121                "start_timestamp", "exec_timestamp", "end_timestamp",
122                "__weakref__"]
123
124   def __init__(self, op):
125     """Initializes instances of this class.
126
127     @type op: L{opcodes.OpCode}
128     @param op: the opcode we encapsulate
129
130     """
131     self.input = op
132     self.status = constants.OP_STATUS_QUEUED
133     self.result = None
134     self.log = []
135     self.start_timestamp = None
136     self.exec_timestamp = None
137     self.end_timestamp = None
138
139     # Get initial priority (it might change during the lifetime of this opcode)
140     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
141
142   @classmethod
143   def Restore(cls, state):
144     """Restore the _QueuedOpCode from the serialized form.
145
146     @type state: dict
147     @param state: the serialized state
148     @rtype: _QueuedOpCode
149     @return: a new _QueuedOpCode instance
150
151     """
152     obj = _QueuedOpCode.__new__(cls)
153     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
154     obj.status = state["status"]
155     obj.result = state["result"]
156     obj.log = state["log"]
157     obj.start_timestamp = state.get("start_timestamp", None)
158     obj.exec_timestamp = state.get("exec_timestamp", None)
159     obj.end_timestamp = state.get("end_timestamp", None)
160     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
161     return obj
162
163   def Serialize(self):
164     """Serializes this _QueuedOpCode.
165
166     @rtype: dict
167     @return: the dictionary holding the serialized state
168
169     """
170     return {
171       "input": self.input.__getstate__(),
172       "status": self.status,
173       "result": self.result,
174       "log": self.log,
175       "start_timestamp": self.start_timestamp,
176       "exec_timestamp": self.exec_timestamp,
177       "end_timestamp": self.end_timestamp,
178       "priority": self.priority,
179       }
180
181
182 class _QueuedJob(object):
183   """In-memory job representation.
184
185   This is what we use to track the user-submitted jobs. Locking must
186   be taken care of by users of this class.
187
188   @type queue: L{JobQueue}
189   @ivar queue: the parent queue
190   @ivar id: the job ID
191   @type ops: list
192   @ivar ops: the list of _QueuedOpCode that constitute the job
193   @type log_serial: int
194   @ivar log_serial: holds the index for the next log entry
195   @ivar received_timestamp: the timestamp for when the job was received
196   @ivar start_timestmap: the timestamp for start of execution
197   @ivar end_timestamp: the timestamp for end of execution
198   @ivar writable: Whether the job is allowed to be modified
199
200   """
201   # pylint: disable=W0212
202   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
203                "received_timestamp", "start_timestamp", "end_timestamp",
204                "__weakref__", "processor_lock", "writable"]
205
206   def __init__(self, queue, job_id, ops, writable):
207     """Constructor for the _QueuedJob.
208
209     @type queue: L{JobQueue}
210     @param queue: our parent queue
211     @type job_id: job_id
212     @param job_id: our job id
213     @type ops: list
214     @param ops: the list of opcodes we hold, which will be encapsulated
215         in _QueuedOpCodes
216     @type writable: bool
217     @param writable: Whether job can be modified
218
219     """
220     if not ops:
221       raise errors.GenericError("A job needs at least one opcode")
222
223     self.queue = queue
224     self.id = job_id
225     self.ops = [_QueuedOpCode(op) for op in ops]
226     self.log_serial = 0
227     self.received_timestamp = TimeStampNow()
228     self.start_timestamp = None
229     self.end_timestamp = None
230
231     self._InitInMemory(self, writable)
232
233   @staticmethod
234   def _InitInMemory(obj, writable):
235     """Initializes in-memory variables.
236
237     """
238     obj.writable = writable
239     obj.ops_iter = None
240     obj.cur_opctx = None
241
242     # Read-only jobs are not processed and therefore don't need a lock
243     if writable:
244       obj.processor_lock = threading.Lock()
245     else:
246       obj.processor_lock = None
247
248   def __repr__(self):
249     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
250               "id=%s" % self.id,
251               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
252
253     return "<%s at %#x>" % (" ".join(status), id(self))
254
255   @classmethod
256   def Restore(cls, queue, state, writable):
257     """Restore a _QueuedJob from serialized state:
258
259     @type queue: L{JobQueue}
260     @param queue: to which queue the restored job belongs
261     @type state: dict
262     @param state: the serialized state
263     @type writable: bool
264     @param writable: Whether job can be modified
265     @rtype: _JobQueue
266     @return: the restored _JobQueue instance
267
268     """
269     obj = _QueuedJob.__new__(cls)
270     obj.queue = queue
271     obj.id = state["id"]
272     obj.received_timestamp = state.get("received_timestamp", None)
273     obj.start_timestamp = state.get("start_timestamp", None)
274     obj.end_timestamp = state.get("end_timestamp", None)
275
276     obj.ops = []
277     obj.log_serial = 0
278     for op_state in state["ops"]:
279       op = _QueuedOpCode.Restore(op_state)
280       for log_entry in op.log:
281         obj.log_serial = max(obj.log_serial, log_entry[0])
282       obj.ops.append(op)
283
284     cls._InitInMemory(obj, writable)
285
286     return obj
287
288   def Serialize(self):
289     """Serialize the _JobQueue instance.
290
291     @rtype: dict
292     @return: the serialized state
293
294     """
295     return {
296       "id": self.id,
297       "ops": [op.Serialize() for op in self.ops],
298       "start_timestamp": self.start_timestamp,
299       "end_timestamp": self.end_timestamp,
300       "received_timestamp": self.received_timestamp,
301       }
302
303   def CalcStatus(self):
304     """Compute the status of this job.
305
306     This function iterates over all the _QueuedOpCodes in the job and
307     based on their status, computes the job status.
308
309     The algorithm is:
310       - if we find a cancelled, or finished with error, the job
311         status will be the same
312       - otherwise, the last opcode with the status one of:
313           - waitlock
314           - canceling
315           - running
316
317         will determine the job status
318
319       - otherwise, it means either all opcodes are queued, or success,
320         and the job status will be the same
321
322     @return: the job status
323
324     """
325     status = constants.JOB_STATUS_QUEUED
326
327     all_success = True
328     for op in self.ops:
329       if op.status == constants.OP_STATUS_SUCCESS:
330         continue
331
332       all_success = False
333
334       if op.status == constants.OP_STATUS_QUEUED:
335         pass
336       elif op.status == constants.OP_STATUS_WAITING:
337         status = constants.JOB_STATUS_WAITING
338       elif op.status == constants.OP_STATUS_RUNNING:
339         status = constants.JOB_STATUS_RUNNING
340       elif op.status == constants.OP_STATUS_CANCELING:
341         status = constants.JOB_STATUS_CANCELING
342         break
343       elif op.status == constants.OP_STATUS_ERROR:
344         status = constants.JOB_STATUS_ERROR
345         # The whole job fails if one opcode failed
346         break
347       elif op.status == constants.OP_STATUS_CANCELED:
348         status = constants.OP_STATUS_CANCELED
349         break
350
351     if all_success:
352       status = constants.JOB_STATUS_SUCCESS
353
354     return status
355
356   def CalcPriority(self):
357     """Gets the current priority for this job.
358
359     Only unfinished opcodes are considered. When all are done, the default
360     priority is used.
361
362     @rtype: int
363
364     """
365     priorities = [op.priority for op in self.ops
366                   if op.status not in constants.OPS_FINALIZED]
367
368     if not priorities:
369       # All opcodes are done, assume default priority
370       return constants.OP_PRIO_DEFAULT
371
372     return min(priorities)
373
374   def GetLogEntries(self, newer_than):
375     """Selectively returns the log entries.
376
377     @type newer_than: None or int
378     @param newer_than: if this is None, return all log entries,
379         otherwise return only the log entries with serial higher
380         than this value
381     @rtype: list
382     @return: the list of the log entries selected
383
384     """
385     if newer_than is None:
386       serial = -1
387     else:
388       serial = newer_than
389
390     entries = []
391     for op in self.ops:
392       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
393
394     return entries
395
396   def GetInfo(self, fields):
397     """Returns information about a job.
398
399     @type fields: list
400     @param fields: names of fields to return
401     @rtype: list
402     @return: list with one element for each field
403     @raise errors.OpExecError: when an invalid field
404         has been passed
405
406     """
407     return _SimpleJobQuery(fields)(self)
408
409   def MarkUnfinishedOps(self, status, result):
410     """Mark unfinished opcodes with a given status and result.
411
412     This is an utility function for marking all running or waiting to
413     be run opcodes with a given status. Opcodes which are already
414     finalised are not changed.
415
416     @param status: a given opcode status
417     @param result: the opcode result
418
419     """
420     not_marked = True
421     for op in self.ops:
422       if op.status in constants.OPS_FINALIZED:
423         assert not_marked, "Finalized opcodes found after non-finalized ones"
424         continue
425       op.status = status
426       op.result = result
427       not_marked = False
428
429   def Finalize(self):
430     """Marks the job as finalized.
431
432     """
433     self.end_timestamp = TimeStampNow()
434
435   def Cancel(self):
436     """Marks job as canceled/-ing if possible.
437
438     @rtype: tuple; (bool, string)
439     @return: Boolean describing whether job was successfully canceled or marked
440       as canceling and a text message
441
442     """
443     status = self.CalcStatus()
444
445     if status == constants.JOB_STATUS_QUEUED:
446       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
447                              "Job canceled by request")
448       self.Finalize()
449       return (True, "Job %s canceled" % self.id)
450
451     elif status == constants.JOB_STATUS_WAITING:
452       # The worker will notice the new status and cancel the job
453       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
454       return (True, "Job %s will be canceled" % self.id)
455
456     else:
457       logging.debug("Job %s is no longer waiting in the queue", self.id)
458       return (False, "Job %s is no longer waiting in the queue" % self.id)
459
460
461 class _OpExecCallbacks(mcpu.OpExecCbBase):
462   def __init__(self, queue, job, op):
463     """Initializes this class.
464
465     @type queue: L{JobQueue}
466     @param queue: Job queue
467     @type job: L{_QueuedJob}
468     @param job: Job object
469     @type op: L{_QueuedOpCode}
470     @param op: OpCode
471
472     """
473     assert queue, "Queue is missing"
474     assert job, "Job is missing"
475     assert op, "Opcode is missing"
476
477     self._queue = queue
478     self._job = job
479     self._op = op
480
481   def _CheckCancel(self):
482     """Raises an exception to cancel the job if asked to.
483
484     """
485     # Cancel here if we were asked to
486     if self._op.status == constants.OP_STATUS_CANCELING:
487       logging.debug("Canceling opcode")
488       raise CancelJob()
489
490   @locking.ssynchronized(_QUEUE, shared=1)
491   def NotifyStart(self):
492     """Mark the opcode as running, not lock-waiting.
493
494     This is called from the mcpu code as a notifier function, when the LU is
495     finally about to start the Exec() method. Of course, to have end-user
496     visible results, the opcode must be initially (before calling into
497     Processor.ExecOpCode) set to OP_STATUS_WAITING.
498
499     """
500     assert self._op in self._job.ops
501     assert self._op.status in (constants.OP_STATUS_WAITING,
502                                constants.OP_STATUS_CANCELING)
503
504     # Cancel here if we were asked to
505     self._CheckCancel()
506
507     logging.debug("Opcode is now running")
508
509     self._op.status = constants.OP_STATUS_RUNNING
510     self._op.exec_timestamp = TimeStampNow()
511
512     # And finally replicate the job status
513     self._queue.UpdateJobUnlocked(self._job)
514
515   @locking.ssynchronized(_QUEUE, shared=1)
516   def _AppendFeedback(self, timestamp, log_type, log_msg):
517     """Internal feedback append function, with locks
518
519     """
520     self._job.log_serial += 1
521     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
522     self._queue.UpdateJobUnlocked(self._job, replicate=False)
523
524   def Feedback(self, *args):
525     """Append a log entry.
526
527     """
528     assert len(args) < 3
529
530     if len(args) == 1:
531       log_type = constants.ELOG_MESSAGE
532       log_msg = args[0]
533     else:
534       (log_type, log_msg) = args
535
536     # The time is split to make serialization easier and not lose
537     # precision.
538     timestamp = utils.SplitTime(time.time())
539     self._AppendFeedback(timestamp, log_type, log_msg)
540
541   def CheckCancel(self):
542     """Check whether job has been cancelled.
543
544     """
545     assert self._op.status in (constants.OP_STATUS_WAITING,
546                                constants.OP_STATUS_CANCELING)
547
548     # Cancel here if we were asked to
549     self._CheckCancel()
550
551   def SubmitManyJobs(self, jobs):
552     """Submits jobs for processing.
553
554     See L{JobQueue.SubmitManyJobs}.
555
556     """
557     # Locking is done in job queue
558     return self._queue.SubmitManyJobs(jobs)
559
560
561 class _JobChangesChecker(object):
562   def __init__(self, fields, prev_job_info, prev_log_serial):
563     """Initializes this class.
564
565     @type fields: list of strings
566     @param fields: Fields requested by LUXI client
567     @type prev_job_info: string
568     @param prev_job_info: previous job info, as passed by the LUXI client
569     @type prev_log_serial: string
570     @param prev_log_serial: previous job serial, as passed by the LUXI client
571
572     """
573     self._squery = _SimpleJobQuery(fields)
574     self._prev_job_info = prev_job_info
575     self._prev_log_serial = prev_log_serial
576
577   def __call__(self, job):
578     """Checks whether job has changed.
579
580     @type job: L{_QueuedJob}
581     @param job: Job object
582
583     """
584     assert not job.writable, "Expected read-only job"
585
586     status = job.CalcStatus()
587     job_info = self._squery(job)
588     log_entries = job.GetLogEntries(self._prev_log_serial)
589
590     # Serializing and deserializing data can cause type changes (e.g. from
591     # tuple to list) or precision loss. We're doing it here so that we get
592     # the same modifications as the data received from the client. Without
593     # this, the comparison afterwards might fail without the data being
594     # significantly different.
595     # TODO: we just deserialized from disk, investigate how to make sure that
596     # the job info and log entries are compatible to avoid this further step.
597     # TODO: Doing something like in testutils.py:UnifyValueType might be more
598     # efficient, though floats will be tricky
599     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
600     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
601
602     # Don't even try to wait if the job is no longer running, there will be
603     # no changes.
604     if (status not in (constants.JOB_STATUS_QUEUED,
605                        constants.JOB_STATUS_RUNNING,
606                        constants.JOB_STATUS_WAITING) or
607         job_info != self._prev_job_info or
608         (log_entries and self._prev_log_serial != log_entries[0][0])):
609       logging.debug("Job %s changed", job.id)
610       return (job_info, log_entries)
611
612     return None
613
614
615 class _JobFileChangesWaiter(object):
616   def __init__(self, filename):
617     """Initializes this class.
618
619     @type filename: string
620     @param filename: Path to job file
621     @raises errors.InotifyError: if the notifier cannot be setup
622
623     """
624     self._wm = pyinotify.WatchManager()
625     self._inotify_handler = \
626       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
627     self._notifier = \
628       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
629     try:
630       self._inotify_handler.enable()
631     except Exception:
632       # pyinotify doesn't close file descriptors automatically
633       self._notifier.stop()
634       raise
635
636   def _OnInotify(self, notifier_enabled):
637     """Callback for inotify.
638
639     """
640     if not notifier_enabled:
641       self._inotify_handler.enable()
642
643   def Wait(self, timeout):
644     """Waits for the job file to change.
645
646     @type timeout: float
647     @param timeout: Timeout in seconds
648     @return: Whether there have been events
649
650     """
651     assert timeout >= 0
652     have_events = self._notifier.check_events(timeout * 1000)
653     if have_events:
654       self._notifier.read_events()
655     self._notifier.process_events()
656     return have_events
657
658   def Close(self):
659     """Closes underlying notifier and its file descriptor.
660
661     """
662     self._notifier.stop()
663
664
665 class _JobChangesWaiter(object):
666   def __init__(self, filename):
667     """Initializes this class.
668
669     @type filename: string
670     @param filename: Path to job file
671
672     """
673     self._filewaiter = None
674     self._filename = filename
675
676   def Wait(self, timeout):
677     """Waits for a job to change.
678
679     @type timeout: float
680     @param timeout: Timeout in seconds
681     @return: Whether there have been events
682
683     """
684     if self._filewaiter:
685       return self._filewaiter.Wait(timeout)
686
687     # Lazy setup: Avoid inotify setup cost when job file has already changed.
688     # If this point is reached, return immediately and let caller check the job
689     # file again in case there were changes since the last check. This avoids a
690     # race condition.
691     self._filewaiter = _JobFileChangesWaiter(self._filename)
692
693     return True
694
695   def Close(self):
696     """Closes underlying waiter.
697
698     """
699     if self._filewaiter:
700       self._filewaiter.Close()
701
702
703 class _WaitForJobChangesHelper(object):
704   """Helper class using inotify to wait for changes in a job file.
705
706   This class takes a previous job status and serial, and alerts the client when
707   the current job status has changed.
708
709   """
710   @staticmethod
711   def _CheckForChanges(counter, job_load_fn, check_fn):
712     if counter.next() > 0:
713       # If this isn't the first check the job is given some more time to change
714       # again. This gives better performance for jobs generating many
715       # changes/messages.
716       time.sleep(0.1)
717
718     job = job_load_fn()
719     if not job:
720       raise errors.JobLost()
721
722     result = check_fn(job)
723     if result is None:
724       raise utils.RetryAgain()
725
726     return result
727
728   def __call__(self, filename, job_load_fn,
729                fields, prev_job_info, prev_log_serial, timeout):
730     """Waits for changes on a job.
731
732     @type filename: string
733     @param filename: File on which to wait for changes
734     @type job_load_fn: callable
735     @param job_load_fn: Function to load job
736     @type fields: list of strings
737     @param fields: Which fields to check for changes
738     @type prev_job_info: list or None
739     @param prev_job_info: Last job information returned
740     @type prev_log_serial: int
741     @param prev_log_serial: Last job message serial number
742     @type timeout: float
743     @param timeout: maximum time to wait in seconds
744
745     """
746     counter = itertools.count()
747     try:
748       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
749       waiter = _JobChangesWaiter(filename)
750       try:
751         return utils.Retry(compat.partial(self._CheckForChanges,
752                                           counter, job_load_fn, check_fn),
753                            utils.RETRY_REMAINING_TIME, timeout,
754                            wait_fn=waiter.Wait)
755       finally:
756         waiter.Close()
757     except (errors.InotifyError, errors.JobLost):
758       return None
759     except utils.RetryTimeout:
760       return constants.JOB_NOTCHANGED
761
762
763 def _EncodeOpError(err):
764   """Encodes an error which occurred while processing an opcode.
765
766   """
767   if isinstance(err, errors.GenericError):
768     to_encode = err
769   else:
770     to_encode = errors.OpExecError(str(err))
771
772   return errors.EncodeException(to_encode)
773
774
775 class _TimeoutStrategyWrapper:
776   def __init__(self, fn):
777     """Initializes this class.
778
779     """
780     self._fn = fn
781     self._next = None
782
783   def _Advance(self):
784     """Gets the next timeout if necessary.
785
786     """
787     if self._next is None:
788       self._next = self._fn()
789
790   def Peek(self):
791     """Returns the next timeout.
792
793     """
794     self._Advance()
795     return self._next
796
797   def Next(self):
798     """Returns the current timeout and advances the internal state.
799
800     """
801     self._Advance()
802     result = self._next
803     self._next = None
804     return result
805
806
807 class _OpExecContext:
808   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
809     """Initializes this class.
810
811     """
812     self.op = op
813     self.index = index
814     self.log_prefix = log_prefix
815     self.summary = op.input.Summary()
816
817     # Create local copy to modify
818     if getattr(op.input, opcodes.DEPEND_ATTR, None):
819       self.jobdeps = op.input.depends[:]
820     else:
821       self.jobdeps = None
822
823     self._timeout_strategy_factory = timeout_strategy_factory
824     self._ResetTimeoutStrategy()
825
826   def _ResetTimeoutStrategy(self):
827     """Creates a new timeout strategy.
828
829     """
830     self._timeout_strategy = \
831       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
832
833   def CheckPriorityIncrease(self):
834     """Checks whether priority can and should be increased.
835
836     Called when locks couldn't be acquired.
837
838     """
839     op = self.op
840
841     # Exhausted all retries and next round should not use blocking acquire
842     # for locks?
843     if (self._timeout_strategy.Peek() is None and
844         op.priority > constants.OP_PRIO_HIGHEST):
845       logging.debug("Increasing priority")
846       op.priority -= 1
847       self._ResetTimeoutStrategy()
848       return True
849
850     return False
851
852   def GetNextLockTimeout(self):
853     """Returns the next lock acquire timeout.
854
855     """
856     return self._timeout_strategy.Next()
857
858
859 class _JobProcessor(object):
860   (DEFER,
861    WAITDEP,
862    FINISHED) = range(1, 4)
863
864   def __init__(self, queue, opexec_fn, job,
865                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
866     """Initializes this class.
867
868     """
869     self.queue = queue
870     self.opexec_fn = opexec_fn
871     self.job = job
872     self._timeout_strategy_factory = _timeout_strategy_factory
873
874   @staticmethod
875   def _FindNextOpcode(job, timeout_strategy_factory):
876     """Locates the next opcode to run.
877
878     @type job: L{_QueuedJob}
879     @param job: Job object
880     @param timeout_strategy_factory: Callable to create new timeout strategy
881
882     """
883     # Create some sort of a cache to speed up locating next opcode for future
884     # lookups
885     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
886     # pending and one for processed ops.
887     if job.ops_iter is None:
888       job.ops_iter = enumerate(job.ops)
889
890     # Find next opcode to run
891     while True:
892       try:
893         (idx, op) = job.ops_iter.next()
894       except StopIteration:
895         raise errors.ProgrammerError("Called for a finished job")
896
897       if op.status == constants.OP_STATUS_RUNNING:
898         # Found an opcode already marked as running
899         raise errors.ProgrammerError("Called for job marked as running")
900
901       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
902                              timeout_strategy_factory)
903
904       if op.status not in constants.OPS_FINALIZED:
905         return opctx
906
907       # This is a job that was partially completed before master daemon
908       # shutdown, so it can be expected that some opcodes are already
909       # completed successfully (if any did error out, then the whole job
910       # should have been aborted and not resubmitted for processing).
911       logging.info("%s: opcode %s already processed, skipping",
912                    opctx.log_prefix, opctx.summary)
913
914   @staticmethod
915   def _MarkWaitlock(job, op):
916     """Marks an opcode as waiting for locks.
917
918     The job's start timestamp is also set if necessary.
919
920     @type job: L{_QueuedJob}
921     @param job: Job object
922     @type op: L{_QueuedOpCode}
923     @param op: Opcode object
924
925     """
926     assert op in job.ops
927     assert op.status in (constants.OP_STATUS_QUEUED,
928                          constants.OP_STATUS_WAITING)
929
930     update = False
931
932     op.result = None
933
934     if op.status == constants.OP_STATUS_QUEUED:
935       op.status = constants.OP_STATUS_WAITING
936       update = True
937
938     if op.start_timestamp is None:
939       op.start_timestamp = TimeStampNow()
940       update = True
941
942     if job.start_timestamp is None:
943       job.start_timestamp = op.start_timestamp
944       update = True
945
946     assert op.status == constants.OP_STATUS_WAITING
947
948     return update
949
950   @staticmethod
951   def _CheckDependencies(queue, job, opctx):
952     """Checks if an opcode has dependencies and if so, processes them.
953
954     @type queue: L{JobQueue}
955     @param queue: Queue object
956     @type job: L{_QueuedJob}
957     @param job: Job object
958     @type opctx: L{_OpExecContext}
959     @param opctx: Opcode execution context
960     @rtype: bool
961     @return: Whether opcode will be re-scheduled by dependency tracker
962
963     """
964     op = opctx.op
965
966     result = False
967
968     while opctx.jobdeps:
969       (dep_job_id, dep_status) = opctx.jobdeps[0]
970
971       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
972                                                           dep_status)
973       assert ht.TNonEmptyString(depmsg), "No dependency message"
974
975       logging.info("%s: %s", opctx.log_prefix, depmsg)
976
977       if depresult == _JobDependencyManager.CONTINUE:
978         # Remove dependency and continue
979         opctx.jobdeps.pop(0)
980
981       elif depresult == _JobDependencyManager.WAIT:
982         # Need to wait for notification, dependency tracker will re-add job
983         # to workerpool
984         result = True
985         break
986
987       elif depresult == _JobDependencyManager.CANCEL:
988         # Job was cancelled, cancel this job as well
989         job.Cancel()
990         assert op.status == constants.OP_STATUS_CANCELING
991         break
992
993       elif depresult in (_JobDependencyManager.WRONGSTATUS,
994                          _JobDependencyManager.ERROR):
995         # Job failed or there was an error, this job must fail
996         op.status = constants.OP_STATUS_ERROR
997         op.result = _EncodeOpError(errors.OpExecError(depmsg))
998         break
999
1000       else:
1001         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1002                                      depresult)
1003
1004     return result
1005
1006   def _ExecOpCodeUnlocked(self, opctx):
1007     """Processes one opcode and returns the result.
1008
1009     """
1010     op = opctx.op
1011
1012     assert op.status == constants.OP_STATUS_WAITING
1013
1014     timeout = opctx.GetNextLockTimeout()
1015
1016     try:
1017       # Make sure not to hold queue lock while calling ExecOpCode
1018       result = self.opexec_fn(op.input,
1019                               _OpExecCallbacks(self.queue, self.job, op),
1020                               timeout=timeout, priority=op.priority)
1021     except mcpu.LockAcquireTimeout:
1022       assert timeout is not None, "Received timeout for blocking acquire"
1023       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1024
1025       assert op.status in (constants.OP_STATUS_WAITING,
1026                            constants.OP_STATUS_CANCELING)
1027
1028       # Was job cancelled while we were waiting for the lock?
1029       if op.status == constants.OP_STATUS_CANCELING:
1030         return (constants.OP_STATUS_CANCELING, None)
1031
1032       # Stay in waitlock while trying to re-acquire lock
1033       return (constants.OP_STATUS_WAITING, None)
1034     except CancelJob:
1035       logging.exception("%s: Canceling job", opctx.log_prefix)
1036       assert op.status == constants.OP_STATUS_CANCELING
1037       return (constants.OP_STATUS_CANCELING, None)
1038     except Exception, err: # pylint: disable=W0703
1039       logging.exception("%s: Caught exception in %s",
1040                         opctx.log_prefix, opctx.summary)
1041       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1042     else:
1043       logging.debug("%s: %s successful",
1044                     opctx.log_prefix, opctx.summary)
1045       return (constants.OP_STATUS_SUCCESS, result)
1046
1047   def __call__(self, _nextop_fn=None):
1048     """Continues execution of a job.
1049
1050     @param _nextop_fn: Callback function for tests
1051     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1052       be deferred and C{WAITDEP} if the dependency manager
1053       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1054
1055     """
1056     queue = self.queue
1057     job = self.job
1058
1059     logging.debug("Processing job %s", job.id)
1060
1061     queue.acquire(shared=1)
1062     try:
1063       opcount = len(job.ops)
1064
1065       assert job.writable, "Expected writable job"
1066
1067       # Don't do anything for finalized jobs
1068       if job.CalcStatus() in constants.JOBS_FINALIZED:
1069         return self.FINISHED
1070
1071       # Is a previous opcode still pending?
1072       if job.cur_opctx:
1073         opctx = job.cur_opctx
1074         job.cur_opctx = None
1075       else:
1076         if __debug__ and _nextop_fn:
1077           _nextop_fn()
1078         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1079
1080       op = opctx.op
1081
1082       # Consistency check
1083       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1084                                      constants.OP_STATUS_CANCELING)
1085                         for i in job.ops[opctx.index + 1:])
1086
1087       assert op.status in (constants.OP_STATUS_QUEUED,
1088                            constants.OP_STATUS_WAITING,
1089                            constants.OP_STATUS_CANCELING)
1090
1091       assert (op.priority <= constants.OP_PRIO_LOWEST and
1092               op.priority >= constants.OP_PRIO_HIGHEST)
1093
1094       waitjob = None
1095
1096       if op.status != constants.OP_STATUS_CANCELING:
1097         assert op.status in (constants.OP_STATUS_QUEUED,
1098                              constants.OP_STATUS_WAITING)
1099
1100         # Prepare to start opcode
1101         if self._MarkWaitlock(job, op):
1102           # Write to disk
1103           queue.UpdateJobUnlocked(job)
1104
1105         assert op.status == constants.OP_STATUS_WAITING
1106         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1107         assert job.start_timestamp and op.start_timestamp
1108         assert waitjob is None
1109
1110         # Check if waiting for a job is necessary
1111         waitjob = self._CheckDependencies(queue, job, opctx)
1112
1113         assert op.status in (constants.OP_STATUS_WAITING,
1114                              constants.OP_STATUS_CANCELING,
1115                              constants.OP_STATUS_ERROR)
1116
1117         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1118                                          constants.OP_STATUS_ERROR)):
1119           logging.info("%s: opcode %s waiting for locks",
1120                        opctx.log_prefix, opctx.summary)
1121
1122           assert not opctx.jobdeps, "Not all dependencies were removed"
1123
1124           queue.release()
1125           try:
1126             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1127           finally:
1128             queue.acquire(shared=1)
1129
1130           op.status = op_status
1131           op.result = op_result
1132
1133           assert not waitjob
1134
1135         if op.status == constants.OP_STATUS_WAITING:
1136           # Couldn't get locks in time
1137           assert not op.end_timestamp
1138         else:
1139           # Finalize opcode
1140           op.end_timestamp = TimeStampNow()
1141
1142           if op.status == constants.OP_STATUS_CANCELING:
1143             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1144                                   for i in job.ops[opctx.index:])
1145           else:
1146             assert op.status in constants.OPS_FINALIZED
1147
1148       if op.status == constants.OP_STATUS_WAITING or waitjob:
1149         finalize = False
1150
1151         if not waitjob and opctx.CheckPriorityIncrease():
1152           # Priority was changed, need to update on-disk file
1153           queue.UpdateJobUnlocked(job)
1154
1155         # Keep around for another round
1156         job.cur_opctx = opctx
1157
1158         assert (op.priority <= constants.OP_PRIO_LOWEST and
1159                 op.priority >= constants.OP_PRIO_HIGHEST)
1160
1161         # In no case must the status be finalized here
1162         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1163
1164       else:
1165         # Ensure all opcodes so far have been successful
1166         assert (opctx.index == 0 or
1167                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1168                            for i in job.ops[:opctx.index]))
1169
1170         # Reset context
1171         job.cur_opctx = None
1172
1173         if op.status == constants.OP_STATUS_SUCCESS:
1174           finalize = False
1175
1176         elif op.status == constants.OP_STATUS_ERROR:
1177           # Ensure failed opcode has an exception as its result
1178           assert errors.GetEncodedError(job.ops[opctx.index].result)
1179
1180           to_encode = errors.OpExecError("Preceding opcode failed")
1181           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1182                                 _EncodeOpError(to_encode))
1183           finalize = True
1184
1185           # Consistency check
1186           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1187                             errors.GetEncodedError(i.result)
1188                             for i in job.ops[opctx.index:])
1189
1190         elif op.status == constants.OP_STATUS_CANCELING:
1191           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1192                                 "Job canceled by request")
1193           finalize = True
1194
1195         else:
1196           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1197
1198         if opctx.index == (opcount - 1):
1199           # Finalize on last opcode
1200           finalize = True
1201
1202         if finalize:
1203           # All opcodes have been run, finalize job
1204           job.Finalize()
1205
1206         # Write to disk. If the job status is final, this is the final write
1207         # allowed. Once the file has been written, it can be archived anytime.
1208         queue.UpdateJobUnlocked(job)
1209
1210         assert not waitjob
1211
1212         if finalize:
1213           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1214           return self.FINISHED
1215
1216       assert not waitjob or queue.depmgr.JobWaiting(job)
1217
1218       if waitjob:
1219         return self.WAITDEP
1220       else:
1221         return self.DEFER
1222     finally:
1223       assert job.writable, "Job became read-only while being processed"
1224       queue.release()
1225
1226
1227 def _EvaluateJobProcessorResult(depmgr, job, result):
1228   """Looks at a result from L{_JobProcessor} for a job.
1229
1230   To be used in a L{_JobQueueWorker}.
1231
1232   """
1233   if result == _JobProcessor.FINISHED:
1234     # Notify waiting jobs
1235     depmgr.NotifyWaiters(job.id)
1236
1237   elif result == _JobProcessor.DEFER:
1238     # Schedule again
1239     raise workerpool.DeferTask(priority=job.CalcPriority())
1240
1241   elif result == _JobProcessor.WAITDEP:
1242     # No-op, dependency manager will re-schedule
1243     pass
1244
1245   else:
1246     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1247                                  (result, ))
1248
1249
1250 class _JobQueueWorker(workerpool.BaseWorker):
1251   """The actual job workers.
1252
1253   """
1254   def RunTask(self, job): # pylint: disable=W0221
1255     """Job executor.
1256
1257     @type job: L{_QueuedJob}
1258     @param job: the job to be processed
1259
1260     """
1261     assert job.writable, "Expected writable job"
1262
1263     # Ensure only one worker is active on a single job. If a job registers for
1264     # a dependency job, and the other job notifies before the first worker is
1265     # done, the job can end up in the tasklist more than once.
1266     job.processor_lock.acquire()
1267     try:
1268       return self._RunTaskInner(job)
1269     finally:
1270       job.processor_lock.release()
1271
1272   def _RunTaskInner(self, job):
1273     """Executes a job.
1274
1275     Must be called with per-job lock acquired.
1276
1277     """
1278     queue = job.queue
1279     assert queue == self.pool.queue
1280
1281     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1282     setname_fn(None)
1283
1284     proc = mcpu.Processor(queue.context, job.id)
1285
1286     # Create wrapper for setting thread name
1287     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1288                                     proc.ExecOpCode)
1289
1290     _EvaluateJobProcessorResult(queue.depmgr, job,
1291                                 _JobProcessor(queue, wrap_execop_fn, job)())
1292
1293   @staticmethod
1294   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1295     """Updates the worker thread name to include a short summary of the opcode.
1296
1297     @param setname_fn: Callable setting worker thread name
1298     @param execop_fn: Callable for executing opcode (usually
1299                       L{mcpu.Processor.ExecOpCode})
1300
1301     """
1302     setname_fn(op)
1303     try:
1304       return execop_fn(op, *args, **kwargs)
1305     finally:
1306       setname_fn(None)
1307
1308   @staticmethod
1309   def _GetWorkerName(job, op):
1310     """Sets the worker thread name.
1311
1312     @type job: L{_QueuedJob}
1313     @type op: L{opcodes.OpCode}
1314
1315     """
1316     parts = ["Job%s" % job.id]
1317
1318     if op:
1319       parts.append(op.TinySummary())
1320
1321     return "/".join(parts)
1322
1323
1324 class _JobQueueWorkerPool(workerpool.WorkerPool):
1325   """Simple class implementing a job-processing workerpool.
1326
1327   """
1328   def __init__(self, queue):
1329     super(_JobQueueWorkerPool, self).__init__("Jq",
1330                                               JOBQUEUE_THREADS,
1331                                               _JobQueueWorker)
1332     self.queue = queue
1333
1334
1335 class _JobDependencyManager:
1336   """Keeps track of job dependencies.
1337
1338   """
1339   (WAIT,
1340    ERROR,
1341    CANCEL,
1342    CONTINUE,
1343    WRONGSTATUS) = range(1, 6)
1344
1345   def __init__(self, getstatus_fn, enqueue_fn):
1346     """Initializes this class.
1347
1348     """
1349     self._getstatus_fn = getstatus_fn
1350     self._enqueue_fn = enqueue_fn
1351
1352     self._waiters = {}
1353     self._lock = locking.SharedLock("JobDepMgr")
1354
1355   @locking.ssynchronized(_LOCK, shared=1)
1356   def GetLockInfo(self, requested): # pylint: disable=W0613
1357     """Retrieves information about waiting jobs.
1358
1359     @type requested: set
1360     @param requested: Requested information, see C{query.LQ_*}
1361
1362     """
1363     # No need to sort here, that's being done by the lock manager and query
1364     # library. There are no priorities for notifying jobs, hence all show up as
1365     # one item under "pending".
1366     return [("job/%s" % job_id, None, None,
1367              [("job", [job.id for job in waiters])])
1368             for job_id, waiters in self._waiters.items()
1369             if waiters]
1370
1371   @locking.ssynchronized(_LOCK, shared=1)
1372   def JobWaiting(self, job):
1373     """Checks if a job is waiting.
1374
1375     """
1376     return compat.any(job in jobs
1377                       for jobs in self._waiters.values())
1378
1379   @locking.ssynchronized(_LOCK)
1380   def CheckAndRegister(self, job, dep_job_id, dep_status):
1381     """Checks if a dependency job has the requested status.
1382
1383     If the other job is not yet in a finalized status, the calling job will be
1384     notified (re-added to the workerpool) at a later point.
1385
1386     @type job: L{_QueuedJob}
1387     @param job: Job object
1388     @type dep_job_id: string
1389     @param dep_job_id: ID of dependency job
1390     @type dep_status: list
1391     @param dep_status: Required status
1392
1393     """
1394     assert ht.TString(job.id)
1395     assert ht.TString(dep_job_id)
1396     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1397
1398     if job.id == dep_job_id:
1399       return (self.ERROR, "Job can't depend on itself")
1400
1401     # Get status of dependency job
1402     try:
1403       status = self._getstatus_fn(dep_job_id)
1404     except errors.JobLost, err:
1405       return (self.ERROR, "Dependency error: %s" % err)
1406
1407     assert status in constants.JOB_STATUS_ALL
1408
1409     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1410
1411     if status not in constants.JOBS_FINALIZED:
1412       # Register for notification and wait for job to finish
1413       job_id_waiters.add(job)
1414       return (self.WAIT,
1415               "Need to wait for job %s, wanted status '%s'" %
1416               (dep_job_id, dep_status))
1417
1418     # Remove from waiters list
1419     if job in job_id_waiters:
1420       job_id_waiters.remove(job)
1421
1422     if (status == constants.JOB_STATUS_CANCELED and
1423         constants.JOB_STATUS_CANCELED not in dep_status):
1424       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1425
1426     elif not dep_status or status in dep_status:
1427       return (self.CONTINUE,
1428               "Dependency job %s finished with status '%s'" %
1429               (dep_job_id, status))
1430
1431     else:
1432       return (self.WRONGSTATUS,
1433               "Dependency job %s finished with status '%s',"
1434               " not one of '%s' as required" %
1435               (dep_job_id, status, utils.CommaJoin(dep_status)))
1436
1437   def _RemoveEmptyWaitersUnlocked(self):
1438     """Remove all jobs without actual waiters.
1439
1440     """
1441     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1442                    if not waiters]:
1443       del self._waiters[job_id]
1444
1445   def NotifyWaiters(self, job_id):
1446     """Notifies all jobs waiting for a certain job ID.
1447
1448     @attention: Do not call until L{CheckAndRegister} returned a status other
1449       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1450     @type job_id: string
1451     @param job_id: Job ID
1452
1453     """
1454     assert ht.TString(job_id)
1455
1456     self._lock.acquire()
1457     try:
1458       self._RemoveEmptyWaitersUnlocked()
1459
1460       jobs = self._waiters.pop(job_id, None)
1461     finally:
1462       self._lock.release()
1463
1464     if jobs:
1465       # Re-add jobs to workerpool
1466       logging.debug("Re-adding %s jobs which were waiting for job %s",
1467                     len(jobs), job_id)
1468       self._enqueue_fn(jobs)
1469
1470
1471 def _RequireOpenQueue(fn):
1472   """Decorator for "public" functions.
1473
1474   This function should be used for all 'public' functions. That is,
1475   functions usually called from other classes. Note that this should
1476   be applied only to methods (not plain functions), since it expects
1477   that the decorated function is called with a first argument that has
1478   a '_queue_filelock' argument.
1479
1480   @warning: Use this decorator only after locking.ssynchronized
1481
1482   Example::
1483     @locking.ssynchronized(_LOCK)
1484     @_RequireOpenQueue
1485     def Example(self):
1486       pass
1487
1488   """
1489   def wrapper(self, *args, **kwargs):
1490     # pylint: disable=W0212
1491     assert self._queue_filelock is not None, "Queue should be open"
1492     return fn(self, *args, **kwargs)
1493   return wrapper
1494
1495
1496 def _RequireNonDrainedQueue(fn):
1497   """Decorator checking for a non-drained queue.
1498
1499   To be used with functions submitting new jobs.
1500
1501   """
1502   def wrapper(self, *args, **kwargs):
1503     """Wrapper function.
1504
1505     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1506
1507     """
1508     # Ok when sharing the big job queue lock, as the drain file is created when
1509     # the lock is exclusive.
1510     # Needs access to protected member, pylint: disable=W0212
1511     if self._drained:
1512       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1513
1514     if not self._accepting_jobs:
1515       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1516
1517     return fn(self, *args, **kwargs)
1518   return wrapper
1519
1520
1521 class JobQueue(object):
1522   """Queue used to manage the jobs.
1523
1524   """
1525   def __init__(self, context):
1526     """Constructor for JobQueue.
1527
1528     The constructor will initialize the job queue object and then
1529     start loading the current jobs from disk, either for starting them
1530     (if they were queue) or for aborting them (if they were already
1531     running).
1532
1533     @type context: GanetiContext
1534     @param context: the context object for access to the configuration
1535         data and other ganeti objects
1536
1537     """
1538     self.context = context
1539     self._memcache = weakref.WeakValueDictionary()
1540     self._my_hostname = netutils.Hostname.GetSysName()
1541
1542     # The Big JobQueue lock. If a code block or method acquires it in shared
1543     # mode safe it must guarantee concurrency with all the code acquiring it in
1544     # shared mode, including itself. In order not to acquire it at all
1545     # concurrency must be guaranteed with all code acquiring it in shared mode
1546     # and all code acquiring it exclusively.
1547     self._lock = locking.SharedLock("JobQueue")
1548
1549     self.acquire = self._lock.acquire
1550     self.release = self._lock.release
1551
1552     # Accept jobs by default
1553     self._accepting_jobs = True
1554
1555     # Initialize the queue, and acquire the filelock.
1556     # This ensures no other process is working on the job queue.
1557     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1558
1559     # Read serial file
1560     self._last_serial = jstore.ReadSerial()
1561     assert self._last_serial is not None, ("Serial file was modified between"
1562                                            " check in jstore and here")
1563
1564     # Get initial list of nodes
1565     self._nodes = dict((n.name, n.primary_ip)
1566                        for n in self.context.cfg.GetAllNodesInfo().values()
1567                        if n.master_candidate)
1568
1569     # Remove master node
1570     self._nodes.pop(self._my_hostname, None)
1571
1572     # TODO: Check consistency across nodes
1573
1574     self._queue_size = None
1575     self._UpdateQueueSizeUnlocked()
1576     assert ht.TInt(self._queue_size)
1577     self._drained = jstore.CheckDrainFlag()
1578
1579     # Job dependencies
1580     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1581                                         self._EnqueueJobs)
1582     self.context.glm.AddToLockMonitor(self.depmgr)
1583
1584     # Setup worker pool
1585     self._wpool = _JobQueueWorkerPool(self)
1586     try:
1587       self._InspectQueue()
1588     except:
1589       self._wpool.TerminateWorkers()
1590       raise
1591
1592   @locking.ssynchronized(_LOCK)
1593   @_RequireOpenQueue
1594   def _InspectQueue(self):
1595     """Loads the whole job queue and resumes unfinished jobs.
1596
1597     This function needs the lock here because WorkerPool.AddTask() may start a
1598     job while we're still doing our work.
1599
1600     """
1601     logging.info("Inspecting job queue")
1602
1603     restartjobs = []
1604
1605     all_job_ids = self._GetJobIDsUnlocked()
1606     jobs_count = len(all_job_ids)
1607     lastinfo = time.time()
1608     for idx, job_id in enumerate(all_job_ids):
1609       # Give an update every 1000 jobs or 10 seconds
1610       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1611           idx == (jobs_count - 1)):
1612         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1613                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1614         lastinfo = time.time()
1615
1616       job = self._LoadJobUnlocked(job_id)
1617
1618       # a failure in loading the job can cause 'None' to be returned
1619       if job is None:
1620         continue
1621
1622       status = job.CalcStatus()
1623
1624       if status == constants.JOB_STATUS_QUEUED:
1625         restartjobs.append(job)
1626
1627       elif status in (constants.JOB_STATUS_RUNNING,
1628                       constants.JOB_STATUS_WAITING,
1629                       constants.JOB_STATUS_CANCELING):
1630         logging.warning("Unfinished job %s found: %s", job.id, job)
1631
1632         if status == constants.JOB_STATUS_WAITING:
1633           # Restart job
1634           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1635           restartjobs.append(job)
1636         else:
1637           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1638                                 "Unclean master daemon shutdown")
1639           job.Finalize()
1640
1641         self.UpdateJobUnlocked(job)
1642
1643     if restartjobs:
1644       logging.info("Restarting %s jobs", len(restartjobs))
1645       self._EnqueueJobsUnlocked(restartjobs)
1646
1647     logging.info("Job queue inspection finished")
1648
1649   def _GetRpc(self, address_list):
1650     """Gets RPC runner with context.
1651
1652     """
1653     return rpc.JobQueueRunner(self.context, address_list)
1654
1655   @locking.ssynchronized(_LOCK)
1656   @_RequireOpenQueue
1657   def AddNode(self, node):
1658     """Register a new node with the queue.
1659
1660     @type node: L{objects.Node}
1661     @param node: the node object to be added
1662
1663     """
1664     node_name = node.name
1665     assert node_name != self._my_hostname
1666
1667     # Clean queue directory on added node
1668     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1669     msg = result.fail_msg
1670     if msg:
1671       logging.warning("Cannot cleanup queue directory on node %s: %s",
1672                       node_name, msg)
1673
1674     if not node.master_candidate:
1675       # remove if existing, ignoring errors
1676       self._nodes.pop(node_name, None)
1677       # and skip the replication of the job ids
1678       return
1679
1680     # Upload the whole queue excluding archived jobs
1681     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1682
1683     # Upload current serial file
1684     files.append(constants.JOB_QUEUE_SERIAL_FILE)
1685
1686     # Static address list
1687     addrs = [node.primary_ip]
1688
1689     for file_name in files:
1690       # Read file content
1691       content = utils.ReadFile(file_name)
1692
1693       result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1694                                                         content)
1695       msg = result[node_name].fail_msg
1696       if msg:
1697         logging.error("Failed to upload file %s to node %s: %s",
1698                       file_name, node_name, msg)
1699
1700     self._nodes[node_name] = node.primary_ip
1701
1702   @locking.ssynchronized(_LOCK)
1703   @_RequireOpenQueue
1704   def RemoveNode(self, node_name):
1705     """Callback called when removing nodes from the cluster.
1706
1707     @type node_name: str
1708     @param node_name: the name of the node to remove
1709
1710     """
1711     self._nodes.pop(node_name, None)
1712
1713   @staticmethod
1714   def _CheckRpcResult(result, nodes, failmsg):
1715     """Verifies the status of an RPC call.
1716
1717     Since we aim to keep consistency should this node (the current
1718     master) fail, we will log errors if our rpc fail, and especially
1719     log the case when more than half of the nodes fails.
1720
1721     @param result: the data as returned from the rpc call
1722     @type nodes: list
1723     @param nodes: the list of nodes we made the call to
1724     @type failmsg: str
1725     @param failmsg: the identifier to be used for logging
1726
1727     """
1728     failed = []
1729     success = []
1730
1731     for node in nodes:
1732       msg = result[node].fail_msg
1733       if msg:
1734         failed.append(node)
1735         logging.error("RPC call %s (%s) failed on node %s: %s",
1736                       result[node].call, failmsg, node, msg)
1737       else:
1738         success.append(node)
1739
1740     # +1 for the master node
1741     if (len(success) + 1) < len(failed):
1742       # TODO: Handle failing nodes
1743       logging.error("More than half of the nodes failed")
1744
1745   def _GetNodeIp(self):
1746     """Helper for returning the node name/ip list.
1747
1748     @rtype: (list, list)
1749     @return: a tuple of two lists, the first one with the node
1750         names and the second one with the node addresses
1751
1752     """
1753     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1754     name_list = self._nodes.keys()
1755     addr_list = [self._nodes[name] for name in name_list]
1756     return name_list, addr_list
1757
1758   def _UpdateJobQueueFile(self, file_name, data, replicate):
1759     """Writes a file locally and then replicates it to all nodes.
1760
1761     This function will replace the contents of a file on the local
1762     node and then replicate it to all the other nodes we have.
1763
1764     @type file_name: str
1765     @param file_name: the path of the file to be replicated
1766     @type data: str
1767     @param data: the new contents of the file
1768     @type replicate: boolean
1769     @param replicate: whether to spread the changes to the remote nodes
1770
1771     """
1772     getents = runtime.GetEnts()
1773     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1774                     gid=getents.masterd_gid)
1775
1776     if replicate:
1777       names, addrs = self._GetNodeIp()
1778       result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1779       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1780
1781   def _RenameFilesUnlocked(self, rename):
1782     """Renames a file locally and then replicate the change.
1783
1784     This function will rename a file in the local queue directory
1785     and then replicate this rename to all the other nodes we have.
1786
1787     @type rename: list of (old, new)
1788     @param rename: List containing tuples mapping old to new names
1789
1790     """
1791     # Rename them locally
1792     for old, new in rename:
1793       utils.RenameFile(old, new, mkdir=True)
1794
1795     # ... and on all nodes
1796     names, addrs = self._GetNodeIp()
1797     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1798     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1799
1800   @staticmethod
1801   def _FormatJobID(job_id):
1802     """Convert a job ID to string format.
1803
1804     Currently this just does C{str(job_id)} after performing some
1805     checks, but if we want to change the job id format this will
1806     abstract this change.
1807
1808     @type job_id: int or long
1809     @param job_id: the numeric job id
1810     @rtype: str
1811     @return: the formatted job id
1812
1813     """
1814     if not isinstance(job_id, (int, long)):
1815       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1816     if job_id < 0:
1817       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1818
1819     return str(job_id)
1820
1821   @classmethod
1822   def _GetArchiveDirectory(cls, job_id):
1823     """Returns the archive directory for a job.
1824
1825     @type job_id: str
1826     @param job_id: Job identifier
1827     @rtype: str
1828     @return: Directory name
1829
1830     """
1831     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1832
1833   def _NewSerialsUnlocked(self, count):
1834     """Generates a new job identifier.
1835
1836     Job identifiers are unique during the lifetime of a cluster.
1837
1838     @type count: integer
1839     @param count: how many serials to return
1840     @rtype: str
1841     @return: a string representing the job identifier.
1842
1843     """
1844     assert ht.TPositiveInt(count)
1845
1846     # New number
1847     serial = self._last_serial + count
1848
1849     # Write to file
1850     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1851                              "%s\n" % serial, True)
1852
1853     result = [self._FormatJobID(v)
1854               for v in range(self._last_serial + 1, serial + 1)]
1855
1856     # Keep it only if we were able to write the file
1857     self._last_serial = serial
1858
1859     assert len(result) == count
1860
1861     return result
1862
1863   @staticmethod
1864   def _GetJobPath(job_id):
1865     """Returns the job file for a given job id.
1866
1867     @type job_id: str
1868     @param job_id: the job identifier
1869     @rtype: str
1870     @return: the path to the job file
1871
1872     """
1873     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1874
1875   @classmethod
1876   def _GetArchivedJobPath(cls, job_id):
1877     """Returns the archived job file for a give job id.
1878
1879     @type job_id: str
1880     @param job_id: the job identifier
1881     @rtype: str
1882     @return: the path to the archived job file
1883
1884     """
1885     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1886                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1887
1888   @staticmethod
1889   def _GetJobIDsUnlocked(sort=True):
1890     """Return all known job IDs.
1891
1892     The method only looks at disk because it's a requirement that all
1893     jobs are present on disk (so in the _memcache we don't have any
1894     extra IDs).
1895
1896     @type sort: boolean
1897     @param sort: perform sorting on the returned job ids
1898     @rtype: list
1899     @return: the list of job IDs
1900
1901     """
1902     jlist = []
1903     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1904       m = constants.JOB_FILE_RE.match(filename)
1905       if m:
1906         jlist.append(m.group(1))
1907     if sort:
1908       jlist = utils.NiceSort(jlist)
1909     return jlist
1910
1911   def _LoadJobUnlocked(self, job_id):
1912     """Loads a job from the disk or memory.
1913
1914     Given a job id, this will return the cached job object if
1915     existing, or try to load the job from the disk. If loading from
1916     disk, it will also add the job to the cache.
1917
1918     @param job_id: the job id
1919     @rtype: L{_QueuedJob} or None
1920     @return: either None or the job object
1921
1922     """
1923     job = self._memcache.get(job_id, None)
1924     if job:
1925       logging.debug("Found job %s in memcache", job_id)
1926       assert job.writable, "Found read-only job in memcache"
1927       return job
1928
1929     try:
1930       job = self._LoadJobFromDisk(job_id, False)
1931       if job is None:
1932         return job
1933     except errors.JobFileCorrupted:
1934       old_path = self._GetJobPath(job_id)
1935       new_path = self._GetArchivedJobPath(job_id)
1936       if old_path == new_path:
1937         # job already archived (future case)
1938         logging.exception("Can't parse job %s", job_id)
1939       else:
1940         # non-archived case
1941         logging.exception("Can't parse job %s, will archive.", job_id)
1942         self._RenameFilesUnlocked([(old_path, new_path)])
1943       return None
1944
1945     assert job.writable, "Job just loaded is not writable"
1946
1947     self._memcache[job_id] = job
1948     logging.debug("Added job %s to the cache", job_id)
1949     return job
1950
1951   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1952     """Load the given job file from disk.
1953
1954     Given a job file, read, load and restore it in a _QueuedJob format.
1955
1956     @type job_id: string
1957     @param job_id: job identifier
1958     @type try_archived: bool
1959     @param try_archived: Whether to try loading an archived job
1960     @rtype: L{_QueuedJob} or None
1961     @return: either None or the job object
1962
1963     """
1964     path_functions = [(self._GetJobPath, True)]
1965
1966     if try_archived:
1967       path_functions.append((self._GetArchivedJobPath, False))
1968
1969     raw_data = None
1970     writable_default = None
1971
1972     for (fn, writable_default) in path_functions:
1973       filepath = fn(job_id)
1974       logging.debug("Loading job from %s", filepath)
1975       try:
1976         raw_data = utils.ReadFile(filepath)
1977       except EnvironmentError, err:
1978         if err.errno != errno.ENOENT:
1979           raise
1980       else:
1981         break
1982
1983     if not raw_data:
1984       return None
1985
1986     if writable is None:
1987       writable = writable_default
1988
1989     try:
1990       data = serializer.LoadJson(raw_data)
1991       job = _QueuedJob.Restore(self, data, writable)
1992     except Exception, err: # pylint: disable=W0703
1993       raise errors.JobFileCorrupted(err)
1994
1995     return job
1996
1997   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1998     """Load the given job file from disk.
1999
2000     Given a job file, read, load and restore it in a _QueuedJob format.
2001     In case of error reading the job, it gets returned as None, and the
2002     exception is logged.
2003
2004     @type job_id: string
2005     @param job_id: job identifier
2006     @type try_archived: bool
2007     @param try_archived: Whether to try loading an archived job
2008     @rtype: L{_QueuedJob} or None
2009     @return: either None or the job object
2010
2011     """
2012     try:
2013       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2014     except (errors.JobFileCorrupted, EnvironmentError):
2015       logging.exception("Can't load/parse job %s", job_id)
2016       return None
2017
2018   def _UpdateQueueSizeUnlocked(self):
2019     """Update the queue size.
2020
2021     """
2022     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2023
2024   @locking.ssynchronized(_LOCK)
2025   @_RequireOpenQueue
2026   def SetDrainFlag(self, drain_flag):
2027     """Sets the drain flag for the queue.
2028
2029     @type drain_flag: boolean
2030     @param drain_flag: Whether to set or unset the drain flag
2031
2032     """
2033     jstore.SetDrainFlag(drain_flag)
2034
2035     self._drained = drain_flag
2036
2037     return True
2038
2039   @_RequireOpenQueue
2040   def _SubmitJobUnlocked(self, job_id, ops):
2041     """Create and store a new job.
2042
2043     This enters the job into our job queue and also puts it on the new
2044     queue, in order for it to be picked up by the queue processors.
2045
2046     @type job_id: job ID
2047     @param job_id: the job ID for the new job
2048     @type ops: list
2049     @param ops: The list of OpCodes that will become the new job.
2050     @rtype: L{_QueuedJob}
2051     @return: the job object to be queued
2052     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2053     @raise errors.GenericError: If an opcode is not valid
2054
2055     """
2056     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2057       raise errors.JobQueueFull()
2058
2059     job = _QueuedJob(self, job_id, ops, True)
2060
2061     # Check priority
2062     for idx, op in enumerate(job.ops):
2063       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2064         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2065         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2066                                   " are %s" % (idx, op.priority, allowed))
2067
2068       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2069       if not opcodes.TNoRelativeJobDependencies(dependencies):
2070         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2071                                   " match %s: %s" %
2072                                   (idx, opcodes.TNoRelativeJobDependencies,
2073                                    dependencies))
2074
2075     # Write to disk
2076     self.UpdateJobUnlocked(job)
2077
2078     self._queue_size += 1
2079
2080     logging.debug("Adding new job %s to the cache", job_id)
2081     self._memcache[job_id] = job
2082
2083     return job
2084
2085   @locking.ssynchronized(_LOCK)
2086   @_RequireOpenQueue
2087   @_RequireNonDrainedQueue
2088   def SubmitJob(self, ops):
2089     """Create and store a new job.
2090
2091     @see: L{_SubmitJobUnlocked}
2092
2093     """
2094     (job_id, ) = self._NewSerialsUnlocked(1)
2095     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2096     return job_id
2097
2098   @locking.ssynchronized(_LOCK)
2099   @_RequireOpenQueue
2100   @_RequireNonDrainedQueue
2101   def SubmitManyJobs(self, jobs):
2102     """Create and store multiple jobs.
2103
2104     @see: L{_SubmitJobUnlocked}
2105
2106     """
2107     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2108
2109     (results, added_jobs) = \
2110       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2111
2112     self._EnqueueJobsUnlocked(added_jobs)
2113
2114     return results
2115
2116   @staticmethod
2117   def _FormatSubmitError(msg, ops):
2118     """Formats errors which occurred while submitting a job.
2119
2120     """
2121     return ("%s; opcodes %s" %
2122             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2123
2124   @staticmethod
2125   def _ResolveJobDependencies(resolve_fn, deps):
2126     """Resolves relative job IDs in dependencies.
2127
2128     @type resolve_fn: callable
2129     @param resolve_fn: Function to resolve a relative job ID
2130     @type deps: list
2131     @param deps: Dependencies
2132     @rtype: list
2133     @return: Resolved dependencies
2134
2135     """
2136     result = []
2137
2138     for (dep_job_id, dep_status) in deps:
2139       if ht.TRelativeJobId(dep_job_id):
2140         assert ht.TInt(dep_job_id) and dep_job_id < 0
2141         try:
2142           job_id = resolve_fn(dep_job_id)
2143         except IndexError:
2144           # Abort
2145           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2146       else:
2147         job_id = dep_job_id
2148
2149       result.append((job_id, dep_status))
2150
2151     return (True, result)
2152
2153   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2154     """Create and store multiple jobs.
2155
2156     @see: L{_SubmitJobUnlocked}
2157
2158     """
2159     results = []
2160     added_jobs = []
2161
2162     def resolve_fn(job_idx, reljobid):
2163       assert reljobid < 0
2164       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2165
2166     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2167       for op in ops:
2168         if getattr(op, opcodes.DEPEND_ATTR, None):
2169           (status, data) = \
2170             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2171                                          op.depends)
2172           if not status:
2173             # Abort resolving dependencies
2174             assert ht.TNonEmptyString(data), "No error message"
2175             break
2176           # Use resolved dependencies
2177           op.depends = data
2178       else:
2179         try:
2180           job = self._SubmitJobUnlocked(job_id, ops)
2181         except errors.GenericError, err:
2182           status = False
2183           data = self._FormatSubmitError(str(err), ops)
2184         else:
2185           status = True
2186           data = job_id
2187           added_jobs.append(job)
2188
2189       results.append((status, data))
2190
2191     return (results, added_jobs)
2192
2193   @locking.ssynchronized(_LOCK)
2194   def _EnqueueJobs(self, jobs):
2195     """Helper function to add jobs to worker pool's queue.
2196
2197     @type jobs: list
2198     @param jobs: List of all jobs
2199
2200     """
2201     return self._EnqueueJobsUnlocked(jobs)
2202
2203   def _EnqueueJobsUnlocked(self, jobs):
2204     """Helper function to add jobs to worker pool's queue.
2205
2206     @type jobs: list
2207     @param jobs: List of all jobs
2208
2209     """
2210     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2211     self._wpool.AddManyTasks([(job, ) for job in jobs],
2212                              priority=[job.CalcPriority() for job in jobs])
2213
2214   def _GetJobStatusForDependencies(self, job_id):
2215     """Gets the status of a job for dependencies.
2216
2217     @type job_id: string
2218     @param job_id: Job ID
2219     @raise errors.JobLost: If job can't be found
2220
2221     """
2222     if not isinstance(job_id, basestring):
2223       job_id = self._FormatJobID(job_id)
2224
2225     # Not using in-memory cache as doing so would require an exclusive lock
2226
2227     # Try to load from disk
2228     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2229
2230     assert not job.writable, "Got writable job" # pylint: disable=E1101
2231
2232     if job:
2233       return job.CalcStatus()
2234
2235     raise errors.JobLost("Job %s not found" % job_id)
2236
2237   @_RequireOpenQueue
2238   def UpdateJobUnlocked(self, job, replicate=True):
2239     """Update a job's on disk storage.
2240
2241     After a job has been modified, this function needs to be called in
2242     order to write the changes to disk and replicate them to the other
2243     nodes.
2244
2245     @type job: L{_QueuedJob}
2246     @param job: the changed job
2247     @type replicate: boolean
2248     @param replicate: whether to replicate the change to remote nodes
2249
2250     """
2251     if __debug__:
2252       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2253       assert (finalized ^ (job.end_timestamp is None))
2254       assert job.writable, "Can't update read-only job"
2255
2256     filename = self._GetJobPath(job.id)
2257     data = serializer.DumpJson(job.Serialize())
2258     logging.debug("Writing job %s to %s", job.id, filename)
2259     self._UpdateJobQueueFile(filename, data, replicate)
2260
2261   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2262                         timeout):
2263     """Waits for changes in a job.
2264
2265     @type job_id: string
2266     @param job_id: Job identifier
2267     @type fields: list of strings
2268     @param fields: Which fields to check for changes
2269     @type prev_job_info: list or None
2270     @param prev_job_info: Last job information returned
2271     @type prev_log_serial: int
2272     @param prev_log_serial: Last job message serial number
2273     @type timeout: float
2274     @param timeout: maximum time to wait in seconds
2275     @rtype: tuple (job info, log entries)
2276     @return: a tuple of the job information as required via
2277         the fields parameter, and the log entries as a list
2278
2279         if the job has not changed and the timeout has expired,
2280         we instead return a special value,
2281         L{constants.JOB_NOTCHANGED}, which should be interpreted
2282         as such by the clients
2283
2284     """
2285     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2286                              writable=False)
2287
2288     helper = _WaitForJobChangesHelper()
2289
2290     return helper(self._GetJobPath(job_id), load_fn,
2291                   fields, prev_job_info, prev_log_serial, timeout)
2292
2293   @locking.ssynchronized(_LOCK)
2294   @_RequireOpenQueue
2295   def CancelJob(self, job_id):
2296     """Cancels a job.
2297
2298     This will only succeed if the job has not started yet.
2299
2300     @type job_id: string
2301     @param job_id: job ID of job to be cancelled.
2302
2303     """
2304     logging.info("Cancelling job %s", job_id)
2305
2306     job = self._LoadJobUnlocked(job_id)
2307     if not job:
2308       logging.debug("Job %s not found", job_id)
2309       return (False, "Job %s not found" % job_id)
2310
2311     assert job.writable, "Can't cancel read-only job"
2312
2313     (success, msg) = job.Cancel()
2314
2315     if success:
2316       # If the job was finalized (e.g. cancelled), this is the final write
2317       # allowed. The job can be archived anytime.
2318       self.UpdateJobUnlocked(job)
2319
2320     return (success, msg)
2321
2322   @_RequireOpenQueue
2323   def _ArchiveJobsUnlocked(self, jobs):
2324     """Archives jobs.
2325
2326     @type jobs: list of L{_QueuedJob}
2327     @param jobs: Job objects
2328     @rtype: int
2329     @return: Number of archived jobs
2330
2331     """
2332     archive_jobs = []
2333     rename_files = []
2334     for job in jobs:
2335       assert job.writable, "Can't archive read-only job"
2336
2337       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2338         logging.debug("Job %s is not yet done", job.id)
2339         continue
2340
2341       archive_jobs.append(job)
2342
2343       old = self._GetJobPath(job.id)
2344       new = self._GetArchivedJobPath(job.id)
2345       rename_files.append((old, new))
2346
2347     # TODO: What if 1..n files fail to rename?
2348     self._RenameFilesUnlocked(rename_files)
2349
2350     logging.debug("Successfully archived job(s) %s",
2351                   utils.CommaJoin(job.id for job in archive_jobs))
2352
2353     # Since we haven't quite checked, above, if we succeeded or failed renaming
2354     # the files, we update the cached queue size from the filesystem. When we
2355     # get around to fix the TODO: above, we can use the number of actually
2356     # archived jobs to fix this.
2357     self._UpdateQueueSizeUnlocked()
2358     return len(archive_jobs)
2359
2360   @locking.ssynchronized(_LOCK)
2361   @_RequireOpenQueue
2362   def ArchiveJob(self, job_id):
2363     """Archives a job.
2364
2365     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2366
2367     @type job_id: string
2368     @param job_id: Job ID of job to be archived.
2369     @rtype: bool
2370     @return: Whether job was archived
2371
2372     """
2373     logging.info("Archiving job %s", job_id)
2374
2375     job = self._LoadJobUnlocked(job_id)
2376     if not job:
2377       logging.debug("Job %s not found", job_id)
2378       return False
2379
2380     return self._ArchiveJobsUnlocked([job]) == 1
2381
2382   @locking.ssynchronized(_LOCK)
2383   @_RequireOpenQueue
2384   def AutoArchiveJobs(self, age, timeout):
2385     """Archives all jobs based on age.
2386
2387     The method will archive all jobs which are older than the age
2388     parameter. For jobs that don't have an end timestamp, the start
2389     timestamp will be considered. The special '-1' age will cause
2390     archival of all jobs (that are not running or queued).
2391
2392     @type age: int
2393     @param age: the minimum age in seconds
2394
2395     """
2396     logging.info("Archiving jobs with age more than %s seconds", age)
2397
2398     now = time.time()
2399     end_time = now + timeout
2400     archived_count = 0
2401     last_touched = 0
2402
2403     all_job_ids = self._GetJobIDsUnlocked()
2404     pending = []
2405     for idx, job_id in enumerate(all_job_ids):
2406       last_touched = idx + 1
2407
2408       # Not optimal because jobs could be pending
2409       # TODO: Measure average duration for job archival and take number of
2410       # pending jobs into account.
2411       if time.time() > end_time:
2412         break
2413
2414       # Returns None if the job failed to load
2415       job = self._LoadJobUnlocked(job_id)
2416       if job:
2417         if job.end_timestamp is None:
2418           if job.start_timestamp is None:
2419             job_age = job.received_timestamp
2420           else:
2421             job_age = job.start_timestamp
2422         else:
2423           job_age = job.end_timestamp
2424
2425         if age == -1 or now - job_age[0] > age:
2426           pending.append(job)
2427
2428           # Archive 10 jobs at a time
2429           if len(pending) >= 10:
2430             archived_count += self._ArchiveJobsUnlocked(pending)
2431             pending = []
2432
2433     if pending:
2434       archived_count += self._ArchiveJobsUnlocked(pending)
2435
2436     return (archived_count, len(all_job_ids) - last_touched)
2437
2438   def _Query(self, fields, qfilter):
2439     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2440                        namefield="id")
2441
2442     job_ids = qobj.RequestedNames()
2443
2444     list_all = (job_ids is None)
2445
2446     if list_all:
2447       # Since files are added to/removed from the queue atomically, there's no
2448       # risk of getting the job ids in an inconsistent state.
2449       job_ids = self._GetJobIDsUnlocked()
2450
2451     jobs = []
2452
2453     for job_id in job_ids:
2454       job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2455       if job is not None or not list_all:
2456         jobs.append((job_id, job))
2457
2458     return (qobj, jobs, list_all)
2459
2460   def QueryJobs(self, fields, qfilter):
2461     """Returns a list of jobs in queue.
2462
2463     @type fields: sequence
2464     @param fields: List of wanted fields
2465     @type qfilter: None or query2 filter (list)
2466     @param qfilter: Query filter
2467
2468     """
2469     (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2470
2471     return query.GetQueryResponse(qobj, ctx, sort_by_name=sort_by_name)
2472
2473   def OldStyleQueryJobs(self, job_ids, fields):
2474     """Returns a list of jobs in queue.
2475
2476     @type job_ids: list
2477     @param job_ids: sequence of job identifiers or None for all
2478     @type fields: list
2479     @param fields: names of fields to return
2480     @rtype: list
2481     @return: list one element per job, each element being list with
2482         the requested fields
2483
2484     """
2485     qfilter = qlang.MakeSimpleFilter("id", job_ids)
2486
2487     (qobj, ctx, sort_by_name) = self._Query(fields, qfilter)
2488
2489     return qobj.OldStyleQuery(ctx, sort_by_name=sort_by_name)
2490
2491   @locking.ssynchronized(_LOCK)
2492   def PrepareShutdown(self):
2493     """Prepare to stop the job queue.
2494
2495     Disables execution of jobs in the workerpool and returns whether there are
2496     any jobs currently running. If the latter is the case, the job queue is not
2497     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2498     be called without interfering with any job. Queued and unfinished jobs will
2499     be resumed next time.
2500
2501     Once this function has been called no new job submissions will be accepted
2502     (see L{_RequireNonDrainedQueue}).
2503
2504     @rtype: bool
2505     @return: Whether there are any running jobs
2506
2507     """
2508     if self._accepting_jobs:
2509       self._accepting_jobs = False
2510
2511       # Tell worker pool to stop processing pending tasks
2512       self._wpool.SetActive(False)
2513
2514     return self._wpool.HasRunningTasks()
2515
2516   @locking.ssynchronized(_LOCK)
2517   @_RequireOpenQueue
2518   def Shutdown(self):
2519     """Stops the job queue.
2520
2521     This shutdowns all the worker threads an closes the queue.
2522
2523     """
2524     self._wpool.TerminateWorkers()
2525
2526     self._queue_filelock.Close()
2527     self._queue_filelock = None