jqueue: Factorize code to modify job
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38
39 try:
40   # pylint: disable=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59 from ganeti import ht
60 from ganeti import query
61 from ganeti import qlang
62 from ganeti import pathutils
63 from ganeti import vcluster
64
65
66 JOBQUEUE_THREADS = 25
67
68 # member lock names to be passed to @ssynchronized decorator
69 _LOCK = "_lock"
70 _QUEUE = "_queue"
71
72
73 class CancelJob(Exception):
74   """Special exception to cancel a job.
75
76   """
77
78
79 def TimeStampNow():
80   """Returns the current timestamp.
81
82   @rtype: tuple
83   @return: the current time in the (seconds, microseconds) format
84
85   """
86   return utils.SplitTime(time.time())
87
88
89 def _CallJqUpdate(runner, names, file_name, content):
90   """Updates job queue file after virtualizing filename.
91
92   """
93   virt_file_name = vcluster.MakeVirtualPath(file_name)
94   return runner.call_jobqueue_update(names, virt_file_name, content)
95
96
97 class _SimpleJobQuery:
98   """Wrapper for job queries.
99
100   Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
101
102   """
103   def __init__(self, fields):
104     """Initializes this class.
105
106     """
107     self._query = query.Query(query.JOB_FIELDS, fields)
108
109   def __call__(self, job):
110     """Executes a job query using cached field list.
111
112     """
113     return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
114
115
116 class _QueuedOpCode(object):
117   """Encapsulates an opcode object.
118
119   @ivar log: holds the execution log and consists of tuples
120   of the form C{(log_serial, timestamp, level, message)}
121   @ivar input: the OpCode we encapsulate
122   @ivar status: the current status
123   @ivar result: the result of the LU execution
124   @ivar start_timestamp: timestamp for the start of the execution
125   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
126   @ivar stop_timestamp: timestamp for the end of the execution
127
128   """
129   __slots__ = ["input", "status", "result", "log", "priority",
130                "start_timestamp", "exec_timestamp", "end_timestamp",
131                "__weakref__"]
132
133   def __init__(self, op):
134     """Initializes instances of this class.
135
136     @type op: L{opcodes.OpCode}
137     @param op: the opcode we encapsulate
138
139     """
140     self.input = op
141     self.status = constants.OP_STATUS_QUEUED
142     self.result = None
143     self.log = []
144     self.start_timestamp = None
145     self.exec_timestamp = None
146     self.end_timestamp = None
147
148     # Get initial priority (it might change during the lifetime of this opcode)
149     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
150
151   @classmethod
152   def Restore(cls, state):
153     """Restore the _QueuedOpCode from the serialized form.
154
155     @type state: dict
156     @param state: the serialized state
157     @rtype: _QueuedOpCode
158     @return: a new _QueuedOpCode instance
159
160     """
161     obj = _QueuedOpCode.__new__(cls)
162     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
163     obj.status = state["status"]
164     obj.result = state["result"]
165     obj.log = state["log"]
166     obj.start_timestamp = state.get("start_timestamp", None)
167     obj.exec_timestamp = state.get("exec_timestamp", None)
168     obj.end_timestamp = state.get("end_timestamp", None)
169     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
170     return obj
171
172   def Serialize(self):
173     """Serializes this _QueuedOpCode.
174
175     @rtype: dict
176     @return: the dictionary holding the serialized state
177
178     """
179     return {
180       "input": self.input.__getstate__(),
181       "status": self.status,
182       "result": self.result,
183       "log": self.log,
184       "start_timestamp": self.start_timestamp,
185       "exec_timestamp": self.exec_timestamp,
186       "end_timestamp": self.end_timestamp,
187       "priority": self.priority,
188       }
189
190
191 class _QueuedJob(object):
192   """In-memory job representation.
193
194   This is what we use to track the user-submitted jobs. Locking must
195   be taken care of by users of this class.
196
197   @type queue: L{JobQueue}
198   @ivar queue: the parent queue
199   @ivar id: the job ID
200   @type ops: list
201   @ivar ops: the list of _QueuedOpCode that constitute the job
202   @type log_serial: int
203   @ivar log_serial: holds the index for the next log entry
204   @ivar received_timestamp: the timestamp for when the job was received
205   @ivar start_timestmap: the timestamp for start of execution
206   @ivar end_timestamp: the timestamp for end of execution
207   @ivar writable: Whether the job is allowed to be modified
208
209   """
210   # pylint: disable=W0212
211   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
212                "received_timestamp", "start_timestamp", "end_timestamp",
213                "__weakref__", "processor_lock", "writable", "archived"]
214
215   def __init__(self, queue, job_id, ops, writable):
216     """Constructor for the _QueuedJob.
217
218     @type queue: L{JobQueue}
219     @param queue: our parent queue
220     @type job_id: job_id
221     @param job_id: our job id
222     @type ops: list
223     @param ops: the list of opcodes we hold, which will be encapsulated
224         in _QueuedOpCodes
225     @type writable: bool
226     @param writable: Whether job can be modified
227
228     """
229     if not ops:
230       raise errors.GenericError("A job needs at least one opcode")
231
232     self.queue = queue
233     self.id = int(job_id)
234     self.ops = [_QueuedOpCode(op) for op in ops]
235     self.log_serial = 0
236     self.received_timestamp = TimeStampNow()
237     self.start_timestamp = None
238     self.end_timestamp = None
239     self.archived = False
240
241     self._InitInMemory(self, writable)
242
243     assert not self.archived, "New jobs can not be marked as archived"
244
245   @staticmethod
246   def _InitInMemory(obj, writable):
247     """Initializes in-memory variables.
248
249     """
250     obj.writable = writable
251     obj.ops_iter = None
252     obj.cur_opctx = None
253
254     # Read-only jobs are not processed and therefore don't need a lock
255     if writable:
256       obj.processor_lock = threading.Lock()
257     else:
258       obj.processor_lock = None
259
260   def __repr__(self):
261     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
262               "id=%s" % self.id,
263               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
264
265     return "<%s at %#x>" % (" ".join(status), id(self))
266
267   @classmethod
268   def Restore(cls, queue, state, writable, archived):
269     """Restore a _QueuedJob from serialized state:
270
271     @type queue: L{JobQueue}
272     @param queue: to which queue the restored job belongs
273     @type state: dict
274     @param state: the serialized state
275     @type writable: bool
276     @param writable: Whether job can be modified
277     @type archived: bool
278     @param archived: Whether job was already archived
279     @rtype: _JobQueue
280     @return: the restored _JobQueue instance
281
282     """
283     obj = _QueuedJob.__new__(cls)
284     obj.queue = queue
285     obj.id = int(state["id"])
286     obj.received_timestamp = state.get("received_timestamp", None)
287     obj.start_timestamp = state.get("start_timestamp", None)
288     obj.end_timestamp = state.get("end_timestamp", None)
289     obj.archived = archived
290
291     obj.ops = []
292     obj.log_serial = 0
293     for op_state in state["ops"]:
294       op = _QueuedOpCode.Restore(op_state)
295       for log_entry in op.log:
296         obj.log_serial = max(obj.log_serial, log_entry[0])
297       obj.ops.append(op)
298
299     cls._InitInMemory(obj, writable)
300
301     return obj
302
303   def Serialize(self):
304     """Serialize the _JobQueue instance.
305
306     @rtype: dict
307     @return: the serialized state
308
309     """
310     return {
311       "id": self.id,
312       "ops": [op.Serialize() for op in self.ops],
313       "start_timestamp": self.start_timestamp,
314       "end_timestamp": self.end_timestamp,
315       "received_timestamp": self.received_timestamp,
316       }
317
318   def CalcStatus(self):
319     """Compute the status of this job.
320
321     This function iterates over all the _QueuedOpCodes in the job and
322     based on their status, computes the job status.
323
324     The algorithm is:
325       - if we find a cancelled, or finished with error, the job
326         status will be the same
327       - otherwise, the last opcode with the status one of:
328           - waitlock
329           - canceling
330           - running
331
332         will determine the job status
333
334       - otherwise, it means either all opcodes are queued, or success,
335         and the job status will be the same
336
337     @return: the job status
338
339     """
340     status = constants.JOB_STATUS_QUEUED
341
342     all_success = True
343     for op in self.ops:
344       if op.status == constants.OP_STATUS_SUCCESS:
345         continue
346
347       all_success = False
348
349       if op.status == constants.OP_STATUS_QUEUED:
350         pass
351       elif op.status == constants.OP_STATUS_WAITING:
352         status = constants.JOB_STATUS_WAITING
353       elif op.status == constants.OP_STATUS_RUNNING:
354         status = constants.JOB_STATUS_RUNNING
355       elif op.status == constants.OP_STATUS_CANCELING:
356         status = constants.JOB_STATUS_CANCELING
357         break
358       elif op.status == constants.OP_STATUS_ERROR:
359         status = constants.JOB_STATUS_ERROR
360         # The whole job fails if one opcode failed
361         break
362       elif op.status == constants.OP_STATUS_CANCELED:
363         status = constants.OP_STATUS_CANCELED
364         break
365
366     if all_success:
367       status = constants.JOB_STATUS_SUCCESS
368
369     return status
370
371   def CalcPriority(self):
372     """Gets the current priority for this job.
373
374     Only unfinished opcodes are considered. When all are done, the default
375     priority is used.
376
377     @rtype: int
378
379     """
380     priorities = [op.priority for op in self.ops
381                   if op.status not in constants.OPS_FINALIZED]
382
383     if not priorities:
384       # All opcodes are done, assume default priority
385       return constants.OP_PRIO_DEFAULT
386
387     return min(priorities)
388
389   def GetLogEntries(self, newer_than):
390     """Selectively returns the log entries.
391
392     @type newer_than: None or int
393     @param newer_than: if this is None, return all log entries,
394         otherwise return only the log entries with serial higher
395         than this value
396     @rtype: list
397     @return: the list of the log entries selected
398
399     """
400     if newer_than is None:
401       serial = -1
402     else:
403       serial = newer_than
404
405     entries = []
406     for op in self.ops:
407       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
408
409     return entries
410
411   def GetInfo(self, fields):
412     """Returns information about a job.
413
414     @type fields: list
415     @param fields: names of fields to return
416     @rtype: list
417     @return: list with one element for each field
418     @raise errors.OpExecError: when an invalid field
419         has been passed
420
421     """
422     return _SimpleJobQuery(fields)(self)
423
424   def MarkUnfinishedOps(self, status, result):
425     """Mark unfinished opcodes with a given status and result.
426
427     This is an utility function for marking all running or waiting to
428     be run opcodes with a given status. Opcodes which are already
429     finalised are not changed.
430
431     @param status: a given opcode status
432     @param result: the opcode result
433
434     """
435     not_marked = True
436     for op in self.ops:
437       if op.status in constants.OPS_FINALIZED:
438         assert not_marked, "Finalized opcodes found after non-finalized ones"
439         continue
440       op.status = status
441       op.result = result
442       not_marked = False
443
444   def Finalize(self):
445     """Marks the job as finalized.
446
447     """
448     self.end_timestamp = TimeStampNow()
449
450   def Cancel(self):
451     """Marks job as canceled/-ing if possible.
452
453     @rtype: tuple; (bool, string)
454     @return: Boolean describing whether job was successfully canceled or marked
455       as canceling and a text message
456
457     """
458     status = self.CalcStatus()
459
460     if status == constants.JOB_STATUS_QUEUED:
461       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
462                              "Job canceled by request")
463       self.Finalize()
464       return (True, "Job %s canceled" % self.id)
465
466     elif status == constants.JOB_STATUS_WAITING:
467       # The worker will notice the new status and cancel the job
468       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
469       return (True, "Job %s will be canceled" % self.id)
470
471     else:
472       logging.debug("Job %s is no longer waiting in the queue", self.id)
473       return (False, "Job %s is no longer waiting in the queue" % self.id)
474
475
476 class _OpExecCallbacks(mcpu.OpExecCbBase):
477   def __init__(self, queue, job, op):
478     """Initializes this class.
479
480     @type queue: L{JobQueue}
481     @param queue: Job queue
482     @type job: L{_QueuedJob}
483     @param job: Job object
484     @type op: L{_QueuedOpCode}
485     @param op: OpCode
486
487     """
488     assert queue, "Queue is missing"
489     assert job, "Job is missing"
490     assert op, "Opcode is missing"
491
492     self._queue = queue
493     self._job = job
494     self._op = op
495
496   def _CheckCancel(self):
497     """Raises an exception to cancel the job if asked to.
498
499     """
500     # Cancel here if we were asked to
501     if self._op.status == constants.OP_STATUS_CANCELING:
502       logging.debug("Canceling opcode")
503       raise CancelJob()
504
505   @locking.ssynchronized(_QUEUE, shared=1)
506   def NotifyStart(self):
507     """Mark the opcode as running, not lock-waiting.
508
509     This is called from the mcpu code as a notifier function, when the LU is
510     finally about to start the Exec() method. Of course, to have end-user
511     visible results, the opcode must be initially (before calling into
512     Processor.ExecOpCode) set to OP_STATUS_WAITING.
513
514     """
515     assert self._op in self._job.ops
516     assert self._op.status in (constants.OP_STATUS_WAITING,
517                                constants.OP_STATUS_CANCELING)
518
519     # Cancel here if we were asked to
520     self._CheckCancel()
521
522     logging.debug("Opcode is now running")
523
524     self._op.status = constants.OP_STATUS_RUNNING
525     self._op.exec_timestamp = TimeStampNow()
526
527     # And finally replicate the job status
528     self._queue.UpdateJobUnlocked(self._job)
529
530   @locking.ssynchronized(_QUEUE, shared=1)
531   def _AppendFeedback(self, timestamp, log_type, log_msg):
532     """Internal feedback append function, with locks
533
534     """
535     self._job.log_serial += 1
536     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
537     self._queue.UpdateJobUnlocked(self._job, replicate=False)
538
539   def Feedback(self, *args):
540     """Append a log entry.
541
542     """
543     assert len(args) < 3
544
545     if len(args) == 1:
546       log_type = constants.ELOG_MESSAGE
547       log_msg = args[0]
548     else:
549       (log_type, log_msg) = args
550
551     # The time is split to make serialization easier and not lose
552     # precision.
553     timestamp = utils.SplitTime(time.time())
554     self._AppendFeedback(timestamp, log_type, log_msg)
555
556   def CheckCancel(self):
557     """Check whether job has been cancelled.
558
559     """
560     assert self._op.status in (constants.OP_STATUS_WAITING,
561                                constants.OP_STATUS_CANCELING)
562
563     # Cancel here if we were asked to
564     self._CheckCancel()
565
566   def SubmitManyJobs(self, jobs):
567     """Submits jobs for processing.
568
569     See L{JobQueue.SubmitManyJobs}.
570
571     """
572     # Locking is done in job queue
573     return self._queue.SubmitManyJobs(jobs)
574
575
576 class _JobChangesChecker(object):
577   def __init__(self, fields, prev_job_info, prev_log_serial):
578     """Initializes this class.
579
580     @type fields: list of strings
581     @param fields: Fields requested by LUXI client
582     @type prev_job_info: string
583     @param prev_job_info: previous job info, as passed by the LUXI client
584     @type prev_log_serial: string
585     @param prev_log_serial: previous job serial, as passed by the LUXI client
586
587     """
588     self._squery = _SimpleJobQuery(fields)
589     self._prev_job_info = prev_job_info
590     self._prev_log_serial = prev_log_serial
591
592   def __call__(self, job):
593     """Checks whether job has changed.
594
595     @type job: L{_QueuedJob}
596     @param job: Job object
597
598     """
599     assert not job.writable, "Expected read-only job"
600
601     status = job.CalcStatus()
602     job_info = self._squery(job)
603     log_entries = job.GetLogEntries(self._prev_log_serial)
604
605     # Serializing and deserializing data can cause type changes (e.g. from
606     # tuple to list) or precision loss. We're doing it here so that we get
607     # the same modifications as the data received from the client. Without
608     # this, the comparison afterwards might fail without the data being
609     # significantly different.
610     # TODO: we just deserialized from disk, investigate how to make sure that
611     # the job info and log entries are compatible to avoid this further step.
612     # TODO: Doing something like in testutils.py:UnifyValueType might be more
613     # efficient, though floats will be tricky
614     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
615     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
616
617     # Don't even try to wait if the job is no longer running, there will be
618     # no changes.
619     if (status not in (constants.JOB_STATUS_QUEUED,
620                        constants.JOB_STATUS_RUNNING,
621                        constants.JOB_STATUS_WAITING) or
622         job_info != self._prev_job_info or
623         (log_entries and self._prev_log_serial != log_entries[0][0])):
624       logging.debug("Job %s changed", job.id)
625       return (job_info, log_entries)
626
627     return None
628
629
630 class _JobFileChangesWaiter(object):
631   def __init__(self, filename):
632     """Initializes this class.
633
634     @type filename: string
635     @param filename: Path to job file
636     @raises errors.InotifyError: if the notifier cannot be setup
637
638     """
639     self._wm = pyinotify.WatchManager()
640     self._inotify_handler = \
641       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
642     self._notifier = \
643       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
644     try:
645       self._inotify_handler.enable()
646     except Exception:
647       # pyinotify doesn't close file descriptors automatically
648       self._notifier.stop()
649       raise
650
651   def _OnInotify(self, notifier_enabled):
652     """Callback for inotify.
653
654     """
655     if not notifier_enabled:
656       self._inotify_handler.enable()
657
658   def Wait(self, timeout):
659     """Waits for the job file to change.
660
661     @type timeout: float
662     @param timeout: Timeout in seconds
663     @return: Whether there have been events
664
665     """
666     assert timeout >= 0
667     have_events = self._notifier.check_events(timeout * 1000)
668     if have_events:
669       self._notifier.read_events()
670     self._notifier.process_events()
671     return have_events
672
673   def Close(self):
674     """Closes underlying notifier and its file descriptor.
675
676     """
677     self._notifier.stop()
678
679
680 class _JobChangesWaiter(object):
681   def __init__(self, filename):
682     """Initializes this class.
683
684     @type filename: string
685     @param filename: Path to job file
686
687     """
688     self._filewaiter = None
689     self._filename = filename
690
691   def Wait(self, timeout):
692     """Waits for a job to change.
693
694     @type timeout: float
695     @param timeout: Timeout in seconds
696     @return: Whether there have been events
697
698     """
699     if self._filewaiter:
700       return self._filewaiter.Wait(timeout)
701
702     # Lazy setup: Avoid inotify setup cost when job file has already changed.
703     # If this point is reached, return immediately and let caller check the job
704     # file again in case there were changes since the last check. This avoids a
705     # race condition.
706     self._filewaiter = _JobFileChangesWaiter(self._filename)
707
708     return True
709
710   def Close(self):
711     """Closes underlying waiter.
712
713     """
714     if self._filewaiter:
715       self._filewaiter.Close()
716
717
718 class _WaitForJobChangesHelper(object):
719   """Helper class using inotify to wait for changes in a job file.
720
721   This class takes a previous job status and serial, and alerts the client when
722   the current job status has changed.
723
724   """
725   @staticmethod
726   def _CheckForChanges(counter, job_load_fn, check_fn):
727     if counter.next() > 0:
728       # If this isn't the first check the job is given some more time to change
729       # again. This gives better performance for jobs generating many
730       # changes/messages.
731       time.sleep(0.1)
732
733     job = job_load_fn()
734     if not job:
735       raise errors.JobLost()
736
737     result = check_fn(job)
738     if result is None:
739       raise utils.RetryAgain()
740
741     return result
742
743   def __call__(self, filename, job_load_fn,
744                fields, prev_job_info, prev_log_serial, timeout):
745     """Waits for changes on a job.
746
747     @type filename: string
748     @param filename: File on which to wait for changes
749     @type job_load_fn: callable
750     @param job_load_fn: Function to load job
751     @type fields: list of strings
752     @param fields: Which fields to check for changes
753     @type prev_job_info: list or None
754     @param prev_job_info: Last job information returned
755     @type prev_log_serial: int
756     @param prev_log_serial: Last job message serial number
757     @type timeout: float
758     @param timeout: maximum time to wait in seconds
759
760     """
761     counter = itertools.count()
762     try:
763       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
764       waiter = _JobChangesWaiter(filename)
765       try:
766         return utils.Retry(compat.partial(self._CheckForChanges,
767                                           counter, job_load_fn, check_fn),
768                            utils.RETRY_REMAINING_TIME, timeout,
769                            wait_fn=waiter.Wait)
770       finally:
771         waiter.Close()
772     except (errors.InotifyError, errors.JobLost):
773       return None
774     except utils.RetryTimeout:
775       return constants.JOB_NOTCHANGED
776
777
778 def _EncodeOpError(err):
779   """Encodes an error which occurred while processing an opcode.
780
781   """
782   if isinstance(err, errors.GenericError):
783     to_encode = err
784   else:
785     to_encode = errors.OpExecError(str(err))
786
787   return errors.EncodeException(to_encode)
788
789
790 class _TimeoutStrategyWrapper:
791   def __init__(self, fn):
792     """Initializes this class.
793
794     """
795     self._fn = fn
796     self._next = None
797
798   def _Advance(self):
799     """Gets the next timeout if necessary.
800
801     """
802     if self._next is None:
803       self._next = self._fn()
804
805   def Peek(self):
806     """Returns the next timeout.
807
808     """
809     self._Advance()
810     return self._next
811
812   def Next(self):
813     """Returns the current timeout and advances the internal state.
814
815     """
816     self._Advance()
817     result = self._next
818     self._next = None
819     return result
820
821
822 class _OpExecContext:
823   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
824     """Initializes this class.
825
826     """
827     self.op = op
828     self.index = index
829     self.log_prefix = log_prefix
830     self.summary = op.input.Summary()
831
832     # Create local copy to modify
833     if getattr(op.input, opcodes.DEPEND_ATTR, None):
834       self.jobdeps = op.input.depends[:]
835     else:
836       self.jobdeps = None
837
838     self._timeout_strategy_factory = timeout_strategy_factory
839     self._ResetTimeoutStrategy()
840
841   def _ResetTimeoutStrategy(self):
842     """Creates a new timeout strategy.
843
844     """
845     self._timeout_strategy = \
846       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
847
848   def CheckPriorityIncrease(self):
849     """Checks whether priority can and should be increased.
850
851     Called when locks couldn't be acquired.
852
853     """
854     op = self.op
855
856     # Exhausted all retries and next round should not use blocking acquire
857     # for locks?
858     if (self._timeout_strategy.Peek() is None and
859         op.priority > constants.OP_PRIO_HIGHEST):
860       logging.debug("Increasing priority")
861       op.priority -= 1
862       self._ResetTimeoutStrategy()
863       return True
864
865     return False
866
867   def GetNextLockTimeout(self):
868     """Returns the next lock acquire timeout.
869
870     """
871     return self._timeout_strategy.Next()
872
873
874 class _JobProcessor(object):
875   (DEFER,
876    WAITDEP,
877    FINISHED) = range(1, 4)
878
879   def __init__(self, queue, opexec_fn, job,
880                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
881     """Initializes this class.
882
883     """
884     self.queue = queue
885     self.opexec_fn = opexec_fn
886     self.job = job
887     self._timeout_strategy_factory = _timeout_strategy_factory
888
889   @staticmethod
890   def _FindNextOpcode(job, timeout_strategy_factory):
891     """Locates the next opcode to run.
892
893     @type job: L{_QueuedJob}
894     @param job: Job object
895     @param timeout_strategy_factory: Callable to create new timeout strategy
896
897     """
898     # Create some sort of a cache to speed up locating next opcode for future
899     # lookups
900     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
901     # pending and one for processed ops.
902     if job.ops_iter is None:
903       job.ops_iter = enumerate(job.ops)
904
905     # Find next opcode to run
906     while True:
907       try:
908         (idx, op) = job.ops_iter.next()
909       except StopIteration:
910         raise errors.ProgrammerError("Called for a finished job")
911
912       if op.status == constants.OP_STATUS_RUNNING:
913         # Found an opcode already marked as running
914         raise errors.ProgrammerError("Called for job marked as running")
915
916       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
917                              timeout_strategy_factory)
918
919       if op.status not in constants.OPS_FINALIZED:
920         return opctx
921
922       # This is a job that was partially completed before master daemon
923       # shutdown, so it can be expected that some opcodes are already
924       # completed successfully (if any did error out, then the whole job
925       # should have been aborted and not resubmitted for processing).
926       logging.info("%s: opcode %s already processed, skipping",
927                    opctx.log_prefix, opctx.summary)
928
929   @staticmethod
930   def _MarkWaitlock(job, op):
931     """Marks an opcode as waiting for locks.
932
933     The job's start timestamp is also set if necessary.
934
935     @type job: L{_QueuedJob}
936     @param job: Job object
937     @type op: L{_QueuedOpCode}
938     @param op: Opcode object
939
940     """
941     assert op in job.ops
942     assert op.status in (constants.OP_STATUS_QUEUED,
943                          constants.OP_STATUS_WAITING)
944
945     update = False
946
947     op.result = None
948
949     if op.status == constants.OP_STATUS_QUEUED:
950       op.status = constants.OP_STATUS_WAITING
951       update = True
952
953     if op.start_timestamp is None:
954       op.start_timestamp = TimeStampNow()
955       update = True
956
957     if job.start_timestamp is None:
958       job.start_timestamp = op.start_timestamp
959       update = True
960
961     assert op.status == constants.OP_STATUS_WAITING
962
963     return update
964
965   @staticmethod
966   def _CheckDependencies(queue, job, opctx):
967     """Checks if an opcode has dependencies and if so, processes them.
968
969     @type queue: L{JobQueue}
970     @param queue: Queue object
971     @type job: L{_QueuedJob}
972     @param job: Job object
973     @type opctx: L{_OpExecContext}
974     @param opctx: Opcode execution context
975     @rtype: bool
976     @return: Whether opcode will be re-scheduled by dependency tracker
977
978     """
979     op = opctx.op
980
981     result = False
982
983     while opctx.jobdeps:
984       (dep_job_id, dep_status) = opctx.jobdeps[0]
985
986       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
987                                                           dep_status)
988       assert ht.TNonEmptyString(depmsg), "No dependency message"
989
990       logging.info("%s: %s", opctx.log_prefix, depmsg)
991
992       if depresult == _JobDependencyManager.CONTINUE:
993         # Remove dependency and continue
994         opctx.jobdeps.pop(0)
995
996       elif depresult == _JobDependencyManager.WAIT:
997         # Need to wait for notification, dependency tracker will re-add job
998         # to workerpool
999         result = True
1000         break
1001
1002       elif depresult == _JobDependencyManager.CANCEL:
1003         # Job was cancelled, cancel this job as well
1004         job.Cancel()
1005         assert op.status == constants.OP_STATUS_CANCELING
1006         break
1007
1008       elif depresult in (_JobDependencyManager.WRONGSTATUS,
1009                          _JobDependencyManager.ERROR):
1010         # Job failed or there was an error, this job must fail
1011         op.status = constants.OP_STATUS_ERROR
1012         op.result = _EncodeOpError(errors.OpExecError(depmsg))
1013         break
1014
1015       else:
1016         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1017                                      depresult)
1018
1019     return result
1020
1021   def _ExecOpCodeUnlocked(self, opctx):
1022     """Processes one opcode and returns the result.
1023
1024     """
1025     op = opctx.op
1026
1027     assert op.status == constants.OP_STATUS_WAITING
1028
1029     timeout = opctx.GetNextLockTimeout()
1030
1031     try:
1032       # Make sure not to hold queue lock while calling ExecOpCode
1033       result = self.opexec_fn(op.input,
1034                               _OpExecCallbacks(self.queue, self.job, op),
1035                               timeout=timeout, priority=op.priority)
1036     except mcpu.LockAcquireTimeout:
1037       assert timeout is not None, "Received timeout for blocking acquire"
1038       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1039
1040       assert op.status in (constants.OP_STATUS_WAITING,
1041                            constants.OP_STATUS_CANCELING)
1042
1043       # Was job cancelled while we were waiting for the lock?
1044       if op.status == constants.OP_STATUS_CANCELING:
1045         return (constants.OP_STATUS_CANCELING, None)
1046
1047       # Stay in waitlock while trying to re-acquire lock
1048       return (constants.OP_STATUS_WAITING, None)
1049     except CancelJob:
1050       logging.exception("%s: Canceling job", opctx.log_prefix)
1051       assert op.status == constants.OP_STATUS_CANCELING
1052       return (constants.OP_STATUS_CANCELING, None)
1053     except Exception, err: # pylint: disable=W0703
1054       logging.exception("%s: Caught exception in %s",
1055                         opctx.log_prefix, opctx.summary)
1056       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1057     else:
1058       logging.debug("%s: %s successful",
1059                     opctx.log_prefix, opctx.summary)
1060       return (constants.OP_STATUS_SUCCESS, result)
1061
1062   def __call__(self, _nextop_fn=None):
1063     """Continues execution of a job.
1064
1065     @param _nextop_fn: Callback function for tests
1066     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1067       be deferred and C{WAITDEP} if the dependency manager
1068       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1069
1070     """
1071     queue = self.queue
1072     job = self.job
1073
1074     logging.debug("Processing job %s", job.id)
1075
1076     queue.acquire(shared=1)
1077     try:
1078       opcount = len(job.ops)
1079
1080       assert job.writable, "Expected writable job"
1081
1082       # Don't do anything for finalized jobs
1083       if job.CalcStatus() in constants.JOBS_FINALIZED:
1084         return self.FINISHED
1085
1086       # Is a previous opcode still pending?
1087       if job.cur_opctx:
1088         opctx = job.cur_opctx
1089         job.cur_opctx = None
1090       else:
1091         if __debug__ and _nextop_fn:
1092           _nextop_fn()
1093         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1094
1095       op = opctx.op
1096
1097       # Consistency check
1098       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1099                                      constants.OP_STATUS_CANCELING)
1100                         for i in job.ops[opctx.index + 1:])
1101
1102       assert op.status in (constants.OP_STATUS_QUEUED,
1103                            constants.OP_STATUS_WAITING,
1104                            constants.OP_STATUS_CANCELING)
1105
1106       assert (op.priority <= constants.OP_PRIO_LOWEST and
1107               op.priority >= constants.OP_PRIO_HIGHEST)
1108
1109       waitjob = None
1110
1111       if op.status != constants.OP_STATUS_CANCELING:
1112         assert op.status in (constants.OP_STATUS_QUEUED,
1113                              constants.OP_STATUS_WAITING)
1114
1115         # Prepare to start opcode
1116         if self._MarkWaitlock(job, op):
1117           # Write to disk
1118           queue.UpdateJobUnlocked(job)
1119
1120         assert op.status == constants.OP_STATUS_WAITING
1121         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1122         assert job.start_timestamp and op.start_timestamp
1123         assert waitjob is None
1124
1125         # Check if waiting for a job is necessary
1126         waitjob = self._CheckDependencies(queue, job, opctx)
1127
1128         assert op.status in (constants.OP_STATUS_WAITING,
1129                              constants.OP_STATUS_CANCELING,
1130                              constants.OP_STATUS_ERROR)
1131
1132         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1133                                          constants.OP_STATUS_ERROR)):
1134           logging.info("%s: opcode %s waiting for locks",
1135                        opctx.log_prefix, opctx.summary)
1136
1137           assert not opctx.jobdeps, "Not all dependencies were removed"
1138
1139           queue.release()
1140           try:
1141             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1142           finally:
1143             queue.acquire(shared=1)
1144
1145           op.status = op_status
1146           op.result = op_result
1147
1148           assert not waitjob
1149
1150         if op.status == constants.OP_STATUS_WAITING:
1151           # Couldn't get locks in time
1152           assert not op.end_timestamp
1153         else:
1154           # Finalize opcode
1155           op.end_timestamp = TimeStampNow()
1156
1157           if op.status == constants.OP_STATUS_CANCELING:
1158             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1159                                   for i in job.ops[opctx.index:])
1160           else:
1161             assert op.status in constants.OPS_FINALIZED
1162
1163       if op.status == constants.OP_STATUS_WAITING or waitjob:
1164         finalize = False
1165
1166         if not waitjob and opctx.CheckPriorityIncrease():
1167           # Priority was changed, need to update on-disk file
1168           queue.UpdateJobUnlocked(job)
1169
1170         # Keep around for another round
1171         job.cur_opctx = opctx
1172
1173         assert (op.priority <= constants.OP_PRIO_LOWEST and
1174                 op.priority >= constants.OP_PRIO_HIGHEST)
1175
1176         # In no case must the status be finalized here
1177         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1178
1179       else:
1180         # Ensure all opcodes so far have been successful
1181         assert (opctx.index == 0 or
1182                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1183                            for i in job.ops[:opctx.index]))
1184
1185         # Reset context
1186         job.cur_opctx = None
1187
1188         if op.status == constants.OP_STATUS_SUCCESS:
1189           finalize = False
1190
1191         elif op.status == constants.OP_STATUS_ERROR:
1192           # Ensure failed opcode has an exception as its result
1193           assert errors.GetEncodedError(job.ops[opctx.index].result)
1194
1195           to_encode = errors.OpExecError("Preceding opcode failed")
1196           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1197                                 _EncodeOpError(to_encode))
1198           finalize = True
1199
1200           # Consistency check
1201           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1202                             errors.GetEncodedError(i.result)
1203                             for i in job.ops[opctx.index:])
1204
1205         elif op.status == constants.OP_STATUS_CANCELING:
1206           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1207                                 "Job canceled by request")
1208           finalize = True
1209
1210         else:
1211           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1212
1213         if opctx.index == (opcount - 1):
1214           # Finalize on last opcode
1215           finalize = True
1216
1217         if finalize:
1218           # All opcodes have been run, finalize job
1219           job.Finalize()
1220
1221         # Write to disk. If the job status is final, this is the final write
1222         # allowed. Once the file has been written, it can be archived anytime.
1223         queue.UpdateJobUnlocked(job)
1224
1225         assert not waitjob
1226
1227         if finalize:
1228           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1229           return self.FINISHED
1230
1231       assert not waitjob or queue.depmgr.JobWaiting(job)
1232
1233       if waitjob:
1234         return self.WAITDEP
1235       else:
1236         return self.DEFER
1237     finally:
1238       assert job.writable, "Job became read-only while being processed"
1239       queue.release()
1240
1241
1242 def _EvaluateJobProcessorResult(depmgr, job, result):
1243   """Looks at a result from L{_JobProcessor} for a job.
1244
1245   To be used in a L{_JobQueueWorker}.
1246
1247   """
1248   if result == _JobProcessor.FINISHED:
1249     # Notify waiting jobs
1250     depmgr.NotifyWaiters(job.id)
1251
1252   elif result == _JobProcessor.DEFER:
1253     # Schedule again
1254     raise workerpool.DeferTask(priority=job.CalcPriority())
1255
1256   elif result == _JobProcessor.WAITDEP:
1257     # No-op, dependency manager will re-schedule
1258     pass
1259
1260   else:
1261     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1262                                  (result, ))
1263
1264
1265 class _JobQueueWorker(workerpool.BaseWorker):
1266   """The actual job workers.
1267
1268   """
1269   def RunTask(self, job): # pylint: disable=W0221
1270     """Job executor.
1271
1272     @type job: L{_QueuedJob}
1273     @param job: the job to be processed
1274
1275     """
1276     assert job.writable, "Expected writable job"
1277
1278     # Ensure only one worker is active on a single job. If a job registers for
1279     # a dependency job, and the other job notifies before the first worker is
1280     # done, the job can end up in the tasklist more than once.
1281     job.processor_lock.acquire()
1282     try:
1283       return self._RunTaskInner(job)
1284     finally:
1285       job.processor_lock.release()
1286
1287   def _RunTaskInner(self, job):
1288     """Executes a job.
1289
1290     Must be called with per-job lock acquired.
1291
1292     """
1293     queue = job.queue
1294     assert queue == self.pool.queue
1295
1296     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1297     setname_fn(None)
1298
1299     proc = mcpu.Processor(queue.context, job.id)
1300
1301     # Create wrapper for setting thread name
1302     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1303                                     proc.ExecOpCode)
1304
1305     _EvaluateJobProcessorResult(queue.depmgr, job,
1306                                 _JobProcessor(queue, wrap_execop_fn, job)())
1307
1308   @staticmethod
1309   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1310     """Updates the worker thread name to include a short summary of the opcode.
1311
1312     @param setname_fn: Callable setting worker thread name
1313     @param execop_fn: Callable for executing opcode (usually
1314                       L{mcpu.Processor.ExecOpCode})
1315
1316     """
1317     setname_fn(op)
1318     try:
1319       return execop_fn(op, *args, **kwargs)
1320     finally:
1321       setname_fn(None)
1322
1323   @staticmethod
1324   def _GetWorkerName(job, op):
1325     """Sets the worker thread name.
1326
1327     @type job: L{_QueuedJob}
1328     @type op: L{opcodes.OpCode}
1329
1330     """
1331     parts = ["Job%s" % job.id]
1332
1333     if op:
1334       parts.append(op.TinySummary())
1335
1336     return "/".join(parts)
1337
1338
1339 class _JobQueueWorkerPool(workerpool.WorkerPool):
1340   """Simple class implementing a job-processing workerpool.
1341
1342   """
1343   def __init__(self, queue):
1344     super(_JobQueueWorkerPool, self).__init__("Jq",
1345                                               JOBQUEUE_THREADS,
1346                                               _JobQueueWorker)
1347     self.queue = queue
1348
1349
1350 class _JobDependencyManager:
1351   """Keeps track of job dependencies.
1352
1353   """
1354   (WAIT,
1355    ERROR,
1356    CANCEL,
1357    CONTINUE,
1358    WRONGSTATUS) = range(1, 6)
1359
1360   def __init__(self, getstatus_fn, enqueue_fn):
1361     """Initializes this class.
1362
1363     """
1364     self._getstatus_fn = getstatus_fn
1365     self._enqueue_fn = enqueue_fn
1366
1367     self._waiters = {}
1368     self._lock = locking.SharedLock("JobDepMgr")
1369
1370   @locking.ssynchronized(_LOCK, shared=1)
1371   def GetLockInfo(self, requested): # pylint: disable=W0613
1372     """Retrieves information about waiting jobs.
1373
1374     @type requested: set
1375     @param requested: Requested information, see C{query.LQ_*}
1376
1377     """
1378     # No need to sort here, that's being done by the lock manager and query
1379     # library. There are no priorities for notifying jobs, hence all show up as
1380     # one item under "pending".
1381     return [("job/%s" % job_id, None, None,
1382              [("job", [job.id for job in waiters])])
1383             for job_id, waiters in self._waiters.items()
1384             if waiters]
1385
1386   @locking.ssynchronized(_LOCK, shared=1)
1387   def JobWaiting(self, job):
1388     """Checks if a job is waiting.
1389
1390     """
1391     return compat.any(job in jobs
1392                       for jobs in self._waiters.values())
1393
1394   @locking.ssynchronized(_LOCK)
1395   def CheckAndRegister(self, job, dep_job_id, dep_status):
1396     """Checks if a dependency job has the requested status.
1397
1398     If the other job is not yet in a finalized status, the calling job will be
1399     notified (re-added to the workerpool) at a later point.
1400
1401     @type job: L{_QueuedJob}
1402     @param job: Job object
1403     @type dep_job_id: int
1404     @param dep_job_id: ID of dependency job
1405     @type dep_status: list
1406     @param dep_status: Required status
1407
1408     """
1409     assert ht.TJobId(job.id)
1410     assert ht.TJobId(dep_job_id)
1411     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1412
1413     if job.id == dep_job_id:
1414       return (self.ERROR, "Job can't depend on itself")
1415
1416     # Get status of dependency job
1417     try:
1418       status = self._getstatus_fn(dep_job_id)
1419     except errors.JobLost, err:
1420       return (self.ERROR, "Dependency error: %s" % err)
1421
1422     assert status in constants.JOB_STATUS_ALL
1423
1424     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1425
1426     if status not in constants.JOBS_FINALIZED:
1427       # Register for notification and wait for job to finish
1428       job_id_waiters.add(job)
1429       return (self.WAIT,
1430               "Need to wait for job %s, wanted status '%s'" %
1431               (dep_job_id, dep_status))
1432
1433     # Remove from waiters list
1434     if job in job_id_waiters:
1435       job_id_waiters.remove(job)
1436
1437     if (status == constants.JOB_STATUS_CANCELED and
1438         constants.JOB_STATUS_CANCELED not in dep_status):
1439       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1440
1441     elif not dep_status or status in dep_status:
1442       return (self.CONTINUE,
1443               "Dependency job %s finished with status '%s'" %
1444               (dep_job_id, status))
1445
1446     else:
1447       return (self.WRONGSTATUS,
1448               "Dependency job %s finished with status '%s',"
1449               " not one of '%s' as required" %
1450               (dep_job_id, status, utils.CommaJoin(dep_status)))
1451
1452   def _RemoveEmptyWaitersUnlocked(self):
1453     """Remove all jobs without actual waiters.
1454
1455     """
1456     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1457                    if not waiters]:
1458       del self._waiters[job_id]
1459
1460   def NotifyWaiters(self, job_id):
1461     """Notifies all jobs waiting for a certain job ID.
1462
1463     @attention: Do not call until L{CheckAndRegister} returned a status other
1464       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1465     @type job_id: int
1466     @param job_id: Job ID
1467
1468     """
1469     assert ht.TJobId(job_id)
1470
1471     self._lock.acquire()
1472     try:
1473       self._RemoveEmptyWaitersUnlocked()
1474
1475       jobs = self._waiters.pop(job_id, None)
1476     finally:
1477       self._lock.release()
1478
1479     if jobs:
1480       # Re-add jobs to workerpool
1481       logging.debug("Re-adding %s jobs which were waiting for job %s",
1482                     len(jobs), job_id)
1483       self._enqueue_fn(jobs)
1484
1485
1486 def _RequireOpenQueue(fn):
1487   """Decorator for "public" functions.
1488
1489   This function should be used for all 'public' functions. That is,
1490   functions usually called from other classes. Note that this should
1491   be applied only to methods (not plain functions), since it expects
1492   that the decorated function is called with a first argument that has
1493   a '_queue_filelock' argument.
1494
1495   @warning: Use this decorator only after locking.ssynchronized
1496
1497   Example::
1498     @locking.ssynchronized(_LOCK)
1499     @_RequireOpenQueue
1500     def Example(self):
1501       pass
1502
1503   """
1504   def wrapper(self, *args, **kwargs):
1505     # pylint: disable=W0212
1506     assert self._queue_filelock is not None, "Queue should be open"
1507     return fn(self, *args, **kwargs)
1508   return wrapper
1509
1510
1511 def _RequireNonDrainedQueue(fn):
1512   """Decorator checking for a non-drained queue.
1513
1514   To be used with functions submitting new jobs.
1515
1516   """
1517   def wrapper(self, *args, **kwargs):
1518     """Wrapper function.
1519
1520     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1521
1522     """
1523     # Ok when sharing the big job queue lock, as the drain file is created when
1524     # the lock is exclusive.
1525     # Needs access to protected member, pylint: disable=W0212
1526     if self._drained:
1527       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1528
1529     if not self._accepting_jobs:
1530       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1531
1532     return fn(self, *args, **kwargs)
1533   return wrapper
1534
1535
1536 class JobQueue(object):
1537   """Queue used to manage the jobs.
1538
1539   """
1540   def __init__(self, context):
1541     """Constructor for JobQueue.
1542
1543     The constructor will initialize the job queue object and then
1544     start loading the current jobs from disk, either for starting them
1545     (if they were queue) or for aborting them (if they were already
1546     running).
1547
1548     @type context: GanetiContext
1549     @param context: the context object for access to the configuration
1550         data and other ganeti objects
1551
1552     """
1553     self.context = context
1554     self._memcache = weakref.WeakValueDictionary()
1555     self._my_hostname = netutils.Hostname.GetSysName()
1556
1557     # The Big JobQueue lock. If a code block or method acquires it in shared
1558     # mode safe it must guarantee concurrency with all the code acquiring it in
1559     # shared mode, including itself. In order not to acquire it at all
1560     # concurrency must be guaranteed with all code acquiring it in shared mode
1561     # and all code acquiring it exclusively.
1562     self._lock = locking.SharedLock("JobQueue")
1563
1564     self.acquire = self._lock.acquire
1565     self.release = self._lock.release
1566
1567     # Accept jobs by default
1568     self._accepting_jobs = True
1569
1570     # Initialize the queue, and acquire the filelock.
1571     # This ensures no other process is working on the job queue.
1572     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1573
1574     # Read serial file
1575     self._last_serial = jstore.ReadSerial()
1576     assert self._last_serial is not None, ("Serial file was modified between"
1577                                            " check in jstore and here")
1578
1579     # Get initial list of nodes
1580     self._nodes = dict((n.name, n.primary_ip)
1581                        for n in self.context.cfg.GetAllNodesInfo().values()
1582                        if n.master_candidate)
1583
1584     # Remove master node
1585     self._nodes.pop(self._my_hostname, None)
1586
1587     # TODO: Check consistency across nodes
1588
1589     self._queue_size = None
1590     self._UpdateQueueSizeUnlocked()
1591     assert ht.TInt(self._queue_size)
1592     self._drained = jstore.CheckDrainFlag()
1593
1594     # Job dependencies
1595     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1596                                         self._EnqueueJobs)
1597     self.context.glm.AddToLockMonitor(self.depmgr)
1598
1599     # Setup worker pool
1600     self._wpool = _JobQueueWorkerPool(self)
1601     try:
1602       self._InspectQueue()
1603     except:
1604       self._wpool.TerminateWorkers()
1605       raise
1606
1607   @locking.ssynchronized(_LOCK)
1608   @_RequireOpenQueue
1609   def _InspectQueue(self):
1610     """Loads the whole job queue and resumes unfinished jobs.
1611
1612     This function needs the lock here because WorkerPool.AddTask() may start a
1613     job while we're still doing our work.
1614
1615     """
1616     logging.info("Inspecting job queue")
1617
1618     restartjobs = []
1619
1620     all_job_ids = self._GetJobIDsUnlocked()
1621     jobs_count = len(all_job_ids)
1622     lastinfo = time.time()
1623     for idx, job_id in enumerate(all_job_ids):
1624       # Give an update every 1000 jobs or 10 seconds
1625       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1626           idx == (jobs_count - 1)):
1627         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1628                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1629         lastinfo = time.time()
1630
1631       job = self._LoadJobUnlocked(job_id)
1632
1633       # a failure in loading the job can cause 'None' to be returned
1634       if job is None:
1635         continue
1636
1637       status = job.CalcStatus()
1638
1639       if status == constants.JOB_STATUS_QUEUED:
1640         restartjobs.append(job)
1641
1642       elif status in (constants.JOB_STATUS_RUNNING,
1643                       constants.JOB_STATUS_WAITING,
1644                       constants.JOB_STATUS_CANCELING):
1645         logging.warning("Unfinished job %s found: %s", job.id, job)
1646
1647         if status == constants.JOB_STATUS_WAITING:
1648           # Restart job
1649           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1650           restartjobs.append(job)
1651         else:
1652           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1653                                 "Unclean master daemon shutdown")
1654           job.Finalize()
1655
1656         self.UpdateJobUnlocked(job)
1657
1658     if restartjobs:
1659       logging.info("Restarting %s jobs", len(restartjobs))
1660       self._EnqueueJobsUnlocked(restartjobs)
1661
1662     logging.info("Job queue inspection finished")
1663
1664   def _GetRpc(self, address_list):
1665     """Gets RPC runner with context.
1666
1667     """
1668     return rpc.JobQueueRunner(self.context, address_list)
1669
1670   @locking.ssynchronized(_LOCK)
1671   @_RequireOpenQueue
1672   def AddNode(self, node):
1673     """Register a new node with the queue.
1674
1675     @type node: L{objects.Node}
1676     @param node: the node object to be added
1677
1678     """
1679     node_name = node.name
1680     assert node_name != self._my_hostname
1681
1682     # Clean queue directory on added node
1683     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1684     msg = result.fail_msg
1685     if msg:
1686       logging.warning("Cannot cleanup queue directory on node %s: %s",
1687                       node_name, msg)
1688
1689     if not node.master_candidate:
1690       # remove if existing, ignoring errors
1691       self._nodes.pop(node_name, None)
1692       # and skip the replication of the job ids
1693       return
1694
1695     # Upload the whole queue excluding archived jobs
1696     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1697
1698     # Upload current serial file
1699     files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1700
1701     # Static address list
1702     addrs = [node.primary_ip]
1703
1704     for file_name in files:
1705       # Read file content
1706       content = utils.ReadFile(file_name)
1707
1708       result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1709                              file_name, content)
1710       msg = result[node_name].fail_msg
1711       if msg:
1712         logging.error("Failed to upload file %s to node %s: %s",
1713                       file_name, node_name, msg)
1714
1715     self._nodes[node_name] = node.primary_ip
1716
1717   @locking.ssynchronized(_LOCK)
1718   @_RequireOpenQueue
1719   def RemoveNode(self, node_name):
1720     """Callback called when removing nodes from the cluster.
1721
1722     @type node_name: str
1723     @param node_name: the name of the node to remove
1724
1725     """
1726     self._nodes.pop(node_name, None)
1727
1728   @staticmethod
1729   def _CheckRpcResult(result, nodes, failmsg):
1730     """Verifies the status of an RPC call.
1731
1732     Since we aim to keep consistency should this node (the current
1733     master) fail, we will log errors if our rpc fail, and especially
1734     log the case when more than half of the nodes fails.
1735
1736     @param result: the data as returned from the rpc call
1737     @type nodes: list
1738     @param nodes: the list of nodes we made the call to
1739     @type failmsg: str
1740     @param failmsg: the identifier to be used for logging
1741
1742     """
1743     failed = []
1744     success = []
1745
1746     for node in nodes:
1747       msg = result[node].fail_msg
1748       if msg:
1749         failed.append(node)
1750         logging.error("RPC call %s (%s) failed on node %s: %s",
1751                       result[node].call, failmsg, node, msg)
1752       else:
1753         success.append(node)
1754
1755     # +1 for the master node
1756     if (len(success) + 1) < len(failed):
1757       # TODO: Handle failing nodes
1758       logging.error("More than half of the nodes failed")
1759
1760   def _GetNodeIp(self):
1761     """Helper for returning the node name/ip list.
1762
1763     @rtype: (list, list)
1764     @return: a tuple of two lists, the first one with the node
1765         names and the second one with the node addresses
1766
1767     """
1768     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1769     name_list = self._nodes.keys()
1770     addr_list = [self._nodes[name] for name in name_list]
1771     return name_list, addr_list
1772
1773   def _UpdateJobQueueFile(self, file_name, data, replicate):
1774     """Writes a file locally and then replicates it to all nodes.
1775
1776     This function will replace the contents of a file on the local
1777     node and then replicate it to all the other nodes we have.
1778
1779     @type file_name: str
1780     @param file_name: the path of the file to be replicated
1781     @type data: str
1782     @param data: the new contents of the file
1783     @type replicate: boolean
1784     @param replicate: whether to spread the changes to the remote nodes
1785
1786     """
1787     getents = runtime.GetEnts()
1788     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1789                     gid=getents.masterd_gid)
1790
1791     if replicate:
1792       names, addrs = self._GetNodeIp()
1793       result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1794       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1795
1796   def _RenameFilesUnlocked(self, rename):
1797     """Renames a file locally and then replicate the change.
1798
1799     This function will rename a file in the local queue directory
1800     and then replicate this rename to all the other nodes we have.
1801
1802     @type rename: list of (old, new)
1803     @param rename: List containing tuples mapping old to new names
1804
1805     """
1806     # Rename them locally
1807     for old, new in rename:
1808       utils.RenameFile(old, new, mkdir=True)
1809
1810     # ... and on all nodes
1811     names, addrs = self._GetNodeIp()
1812     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1813     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1814
1815   def _NewSerialsUnlocked(self, count):
1816     """Generates a new job identifier.
1817
1818     Job identifiers are unique during the lifetime of a cluster.
1819
1820     @type count: integer
1821     @param count: how many serials to return
1822     @rtype: list of int
1823     @return: a list of job identifiers.
1824
1825     """
1826     assert ht.TPositiveInt(count)
1827
1828     # New number
1829     serial = self._last_serial + count
1830
1831     # Write to file
1832     self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1833                              "%s\n" % serial, True)
1834
1835     result = [jstore.FormatJobID(v)
1836               for v in range(self._last_serial + 1, serial + 1)]
1837
1838     # Keep it only if we were able to write the file
1839     self._last_serial = serial
1840
1841     assert len(result) == count
1842
1843     return result
1844
1845   @staticmethod
1846   def _GetJobPath(job_id):
1847     """Returns the job file for a given job id.
1848
1849     @type job_id: str
1850     @param job_id: the job identifier
1851     @rtype: str
1852     @return: the path to the job file
1853
1854     """
1855     return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1856
1857   @staticmethod
1858   def _GetArchivedJobPath(job_id):
1859     """Returns the archived job file for a give job id.
1860
1861     @type job_id: str
1862     @param job_id: the job identifier
1863     @rtype: str
1864     @return: the path to the archived job file
1865
1866     """
1867     return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1868                           jstore.GetArchiveDirectory(job_id),
1869                           "job-%s" % job_id)
1870
1871   @staticmethod
1872   def _DetermineJobDirectories(archived):
1873     """Build list of directories containing job files.
1874
1875     @type archived: bool
1876     @param archived: Whether to include directories for archived jobs
1877     @rtype: list
1878
1879     """
1880     result = [pathutils.QUEUE_DIR]
1881
1882     if archived:
1883       archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1884       result.extend(map(compat.partial(utils.PathJoin, archive_path),
1885                         utils.ListVisibleFiles(archive_path)))
1886
1887     return result
1888
1889   @classmethod
1890   def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1891     """Return all known job IDs.
1892
1893     The method only looks at disk because it's a requirement that all
1894     jobs are present on disk (so in the _memcache we don't have any
1895     extra IDs).
1896
1897     @type sort: boolean
1898     @param sort: perform sorting on the returned job ids
1899     @rtype: list
1900     @return: the list of job IDs
1901
1902     """
1903     jlist = []
1904
1905     for path in cls._DetermineJobDirectories(archived):
1906       for filename in utils.ListVisibleFiles(path):
1907         m = constants.JOB_FILE_RE.match(filename)
1908         if m:
1909           jlist.append(int(m.group(1)))
1910
1911     if sort:
1912       jlist.sort()
1913     return jlist
1914
1915   def _LoadJobUnlocked(self, job_id):
1916     """Loads a job from the disk or memory.
1917
1918     Given a job id, this will return the cached job object if
1919     existing, or try to load the job from the disk. If loading from
1920     disk, it will also add the job to the cache.
1921
1922     @type job_id: int
1923     @param job_id: the job id
1924     @rtype: L{_QueuedJob} or None
1925     @return: either None or the job object
1926
1927     """
1928     job = self._memcache.get(job_id, None)
1929     if job:
1930       logging.debug("Found job %s in memcache", job_id)
1931       assert job.writable, "Found read-only job in memcache"
1932       return job
1933
1934     try:
1935       job = self._LoadJobFromDisk(job_id, False)
1936       if job is None:
1937         return job
1938     except errors.JobFileCorrupted:
1939       old_path = self._GetJobPath(job_id)
1940       new_path = self._GetArchivedJobPath(job_id)
1941       if old_path == new_path:
1942         # job already archived (future case)
1943         logging.exception("Can't parse job %s", job_id)
1944       else:
1945         # non-archived case
1946         logging.exception("Can't parse job %s, will archive.", job_id)
1947         self._RenameFilesUnlocked([(old_path, new_path)])
1948       return None
1949
1950     assert job.writable, "Job just loaded is not writable"
1951
1952     self._memcache[job_id] = job
1953     logging.debug("Added job %s to the cache", job_id)
1954     return job
1955
1956   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1957     """Load the given job file from disk.
1958
1959     Given a job file, read, load and restore it in a _QueuedJob format.
1960
1961     @type job_id: int
1962     @param job_id: job identifier
1963     @type try_archived: bool
1964     @param try_archived: Whether to try loading an archived job
1965     @rtype: L{_QueuedJob} or None
1966     @return: either None or the job object
1967
1968     """
1969     path_functions = [(self._GetJobPath, False)]
1970
1971     if try_archived:
1972       path_functions.append((self._GetArchivedJobPath, True))
1973
1974     raw_data = None
1975     archived = None
1976
1977     for (fn, archived) in path_functions:
1978       filepath = fn(job_id)
1979       logging.debug("Loading job from %s", filepath)
1980       try:
1981         raw_data = utils.ReadFile(filepath)
1982       except EnvironmentError, err:
1983         if err.errno != errno.ENOENT:
1984           raise
1985       else:
1986         break
1987
1988     if not raw_data:
1989       return None
1990
1991     if writable is None:
1992       writable = not archived
1993
1994     try:
1995       data = serializer.LoadJson(raw_data)
1996       job = _QueuedJob.Restore(self, data, writable, archived)
1997     except Exception, err: # pylint: disable=W0703
1998       raise errors.JobFileCorrupted(err)
1999
2000     return job
2001
2002   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2003     """Load the given job file from disk.
2004
2005     Given a job file, read, load and restore it in a _QueuedJob format.
2006     In case of error reading the job, it gets returned as None, and the
2007     exception is logged.
2008
2009     @type job_id: int
2010     @param job_id: job identifier
2011     @type try_archived: bool
2012     @param try_archived: Whether to try loading an archived job
2013     @rtype: L{_QueuedJob} or None
2014     @return: either None or the job object
2015
2016     """
2017     try:
2018       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2019     except (errors.JobFileCorrupted, EnvironmentError):
2020       logging.exception("Can't load/parse job %s", job_id)
2021       return None
2022
2023   def _UpdateQueueSizeUnlocked(self):
2024     """Update the queue size.
2025
2026     """
2027     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2028
2029   @locking.ssynchronized(_LOCK)
2030   @_RequireOpenQueue
2031   def SetDrainFlag(self, drain_flag):
2032     """Sets the drain flag for the queue.
2033
2034     @type drain_flag: boolean
2035     @param drain_flag: Whether to set or unset the drain flag
2036
2037     """
2038     jstore.SetDrainFlag(drain_flag)
2039
2040     self._drained = drain_flag
2041
2042     return True
2043
2044   @_RequireOpenQueue
2045   def _SubmitJobUnlocked(self, job_id, ops):
2046     """Create and store a new job.
2047
2048     This enters the job into our job queue and also puts it on the new
2049     queue, in order for it to be picked up by the queue processors.
2050
2051     @type job_id: job ID
2052     @param job_id: the job ID for the new job
2053     @type ops: list
2054     @param ops: The list of OpCodes that will become the new job.
2055     @rtype: L{_QueuedJob}
2056     @return: the job object to be queued
2057     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2058     @raise errors.GenericError: If an opcode is not valid
2059
2060     """
2061     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2062       raise errors.JobQueueFull()
2063
2064     job = _QueuedJob(self, job_id, ops, True)
2065
2066     for idx, op in enumerate(job.ops):
2067       # Check priority
2068       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2069         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2070         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2071                                   " are %s" % (idx, op.priority, allowed))
2072
2073       # Check job dependencies
2074       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2075       if not opcodes.TNoRelativeJobDependencies(dependencies):
2076         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2077                                   " match %s: %s" %
2078                                   (idx, opcodes.TNoRelativeJobDependencies,
2079                                    dependencies))
2080
2081     # Write to disk
2082     self.UpdateJobUnlocked(job)
2083
2084     self._queue_size += 1
2085
2086     logging.debug("Adding new job %s to the cache", job_id)
2087     self._memcache[job_id] = job
2088
2089     return job
2090
2091   @locking.ssynchronized(_LOCK)
2092   @_RequireOpenQueue
2093   @_RequireNonDrainedQueue
2094   def SubmitJob(self, ops):
2095     """Create and store a new job.
2096
2097     @see: L{_SubmitJobUnlocked}
2098
2099     """
2100     (job_id, ) = self._NewSerialsUnlocked(1)
2101     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2102     return job_id
2103
2104   @locking.ssynchronized(_LOCK)
2105   @_RequireOpenQueue
2106   @_RequireNonDrainedQueue
2107   def SubmitManyJobs(self, jobs):
2108     """Create and store multiple jobs.
2109
2110     @see: L{_SubmitJobUnlocked}
2111
2112     """
2113     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2114
2115     (results, added_jobs) = \
2116       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2117
2118     self._EnqueueJobsUnlocked(added_jobs)
2119
2120     return results
2121
2122   @staticmethod
2123   def _FormatSubmitError(msg, ops):
2124     """Formats errors which occurred while submitting a job.
2125
2126     """
2127     return ("%s; opcodes %s" %
2128             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2129
2130   @staticmethod
2131   def _ResolveJobDependencies(resolve_fn, deps):
2132     """Resolves relative job IDs in dependencies.
2133
2134     @type resolve_fn: callable
2135     @param resolve_fn: Function to resolve a relative job ID
2136     @type deps: list
2137     @param deps: Dependencies
2138     @rtype: tuple; (boolean, string or list)
2139     @return: If successful (first tuple item), the returned list contains
2140       resolved job IDs along with the requested status; if not successful,
2141       the second element is an error message
2142
2143     """
2144     result = []
2145
2146     for (dep_job_id, dep_status) in deps:
2147       if ht.TRelativeJobId(dep_job_id):
2148         assert ht.TInt(dep_job_id) and dep_job_id < 0
2149         try:
2150           job_id = resolve_fn(dep_job_id)
2151         except IndexError:
2152           # Abort
2153           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2154       else:
2155         job_id = dep_job_id
2156
2157       result.append((job_id, dep_status))
2158
2159     return (True, result)
2160
2161   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2162     """Create and store multiple jobs.
2163
2164     @see: L{_SubmitJobUnlocked}
2165
2166     """
2167     results = []
2168     added_jobs = []
2169
2170     def resolve_fn(job_idx, reljobid):
2171       assert reljobid < 0
2172       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2173
2174     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2175       for op in ops:
2176         if getattr(op, opcodes.DEPEND_ATTR, None):
2177           (status, data) = \
2178             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2179                                          op.depends)
2180           if not status:
2181             # Abort resolving dependencies
2182             assert ht.TNonEmptyString(data), "No error message"
2183             break
2184           # Use resolved dependencies
2185           op.depends = data
2186       else:
2187         try:
2188           job = self._SubmitJobUnlocked(job_id, ops)
2189         except errors.GenericError, err:
2190           status = False
2191           data = self._FormatSubmitError(str(err), ops)
2192         else:
2193           status = True
2194           data = job_id
2195           added_jobs.append(job)
2196
2197       results.append((status, data))
2198
2199     return (results, added_jobs)
2200
2201   @locking.ssynchronized(_LOCK)
2202   def _EnqueueJobs(self, jobs):
2203     """Helper function to add jobs to worker pool's queue.
2204
2205     @type jobs: list
2206     @param jobs: List of all jobs
2207
2208     """
2209     return self._EnqueueJobsUnlocked(jobs)
2210
2211   def _EnqueueJobsUnlocked(self, jobs):
2212     """Helper function to add jobs to worker pool's queue.
2213
2214     @type jobs: list
2215     @param jobs: List of all jobs
2216
2217     """
2218     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2219     self._wpool.AddManyTasks([(job, ) for job in jobs],
2220                              priority=[job.CalcPriority() for job in jobs])
2221
2222   def _GetJobStatusForDependencies(self, job_id):
2223     """Gets the status of a job for dependencies.
2224
2225     @type job_id: int
2226     @param job_id: Job ID
2227     @raise errors.JobLost: If job can't be found
2228
2229     """
2230     # Not using in-memory cache as doing so would require an exclusive lock
2231
2232     # Try to load from disk
2233     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2234
2235     assert not job.writable, "Got writable job" # pylint: disable=E1101
2236
2237     if job:
2238       return job.CalcStatus()
2239
2240     raise errors.JobLost("Job %s not found" % job_id)
2241
2242   @_RequireOpenQueue
2243   def UpdateJobUnlocked(self, job, replicate=True):
2244     """Update a job's on disk storage.
2245
2246     After a job has been modified, this function needs to be called in
2247     order to write the changes to disk and replicate them to the other
2248     nodes.
2249
2250     @type job: L{_QueuedJob}
2251     @param job: the changed job
2252     @type replicate: boolean
2253     @param replicate: whether to replicate the change to remote nodes
2254
2255     """
2256     if __debug__:
2257       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2258       assert (finalized ^ (job.end_timestamp is None))
2259       assert job.writable, "Can't update read-only job"
2260       assert not job.archived, "Can't update archived job"
2261
2262     filename = self._GetJobPath(job.id)
2263     data = serializer.DumpJson(job.Serialize())
2264     logging.debug("Writing job %s to %s", job.id, filename)
2265     self._UpdateJobQueueFile(filename, data, replicate)
2266
2267   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2268                         timeout):
2269     """Waits for changes in a job.
2270
2271     @type job_id: int
2272     @param job_id: Job identifier
2273     @type fields: list of strings
2274     @param fields: Which fields to check for changes
2275     @type prev_job_info: list or None
2276     @param prev_job_info: Last job information returned
2277     @type prev_log_serial: int
2278     @param prev_log_serial: Last job message serial number
2279     @type timeout: float
2280     @param timeout: maximum time to wait in seconds
2281     @rtype: tuple (job info, log entries)
2282     @return: a tuple of the job information as required via
2283         the fields parameter, and the log entries as a list
2284
2285         if the job has not changed and the timeout has expired,
2286         we instead return a special value,
2287         L{constants.JOB_NOTCHANGED}, which should be interpreted
2288         as such by the clients
2289
2290     """
2291     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2292                              writable=False)
2293
2294     helper = _WaitForJobChangesHelper()
2295
2296     return helper(self._GetJobPath(job_id), load_fn,
2297                   fields, prev_job_info, prev_log_serial, timeout)
2298
2299   @locking.ssynchronized(_LOCK)
2300   @_RequireOpenQueue
2301   def CancelJob(self, job_id):
2302     """Cancels a job.
2303
2304     This will only succeed if the job has not started yet.
2305
2306     @type job_id: int
2307     @param job_id: job ID of job to be cancelled.
2308
2309     """
2310     logging.info("Cancelling job %s", job_id)
2311
2312     return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2313
2314   def _ModifyJobUnlocked(self, job_id, mod_fn):
2315     """Modifies a job.
2316
2317     @type job_id: int
2318     @param job_id: Job ID
2319     @type mod_fn: callable
2320     @param mod_fn: Modifying function, receiving job object as parameter,
2321       returning tuple of (status boolean, message string)
2322
2323     """
2324     job = self._LoadJobUnlocked(job_id)
2325     if not job:
2326       logging.debug("Job %s not found", job_id)
2327       return (False, "Job %s not found" % job_id)
2328
2329     assert job.writable, "Can't modify read-only job"
2330     assert not job.archived, "Can't modify archived job"
2331
2332     (success, msg) = mod_fn(job)
2333
2334     if success:
2335       # If the job was finalized (e.g. cancelled), this is the final write
2336       # allowed. The job can be archived anytime.
2337       self.UpdateJobUnlocked(job)
2338
2339     return (success, msg)
2340
2341   @_RequireOpenQueue
2342   def _ArchiveJobsUnlocked(self, jobs):
2343     """Archives jobs.
2344
2345     @type jobs: list of L{_QueuedJob}
2346     @param jobs: Job objects
2347     @rtype: int
2348     @return: Number of archived jobs
2349
2350     """
2351     archive_jobs = []
2352     rename_files = []
2353     for job in jobs:
2354       assert job.writable, "Can't archive read-only job"
2355       assert not job.archived, "Can't cancel archived job"
2356
2357       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2358         logging.debug("Job %s is not yet done", job.id)
2359         continue
2360
2361       archive_jobs.append(job)
2362
2363       old = self._GetJobPath(job.id)
2364       new = self._GetArchivedJobPath(job.id)
2365       rename_files.append((old, new))
2366
2367     # TODO: What if 1..n files fail to rename?
2368     self._RenameFilesUnlocked(rename_files)
2369
2370     logging.debug("Successfully archived job(s) %s",
2371                   utils.CommaJoin(job.id for job in archive_jobs))
2372
2373     # Since we haven't quite checked, above, if we succeeded or failed renaming
2374     # the files, we update the cached queue size from the filesystem. When we
2375     # get around to fix the TODO: above, we can use the number of actually
2376     # archived jobs to fix this.
2377     self._UpdateQueueSizeUnlocked()
2378     return len(archive_jobs)
2379
2380   @locking.ssynchronized(_LOCK)
2381   @_RequireOpenQueue
2382   def ArchiveJob(self, job_id):
2383     """Archives a job.
2384
2385     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2386
2387     @type job_id: int
2388     @param job_id: Job ID of job to be archived.
2389     @rtype: bool
2390     @return: Whether job was archived
2391
2392     """
2393     logging.info("Archiving job %s", job_id)
2394
2395     job = self._LoadJobUnlocked(job_id)
2396     if not job:
2397       logging.debug("Job %s not found", job_id)
2398       return False
2399
2400     return self._ArchiveJobsUnlocked([job]) == 1
2401
2402   @locking.ssynchronized(_LOCK)
2403   @_RequireOpenQueue
2404   def AutoArchiveJobs(self, age, timeout):
2405     """Archives all jobs based on age.
2406
2407     The method will archive all jobs which are older than the age
2408     parameter. For jobs that don't have an end timestamp, the start
2409     timestamp will be considered. The special '-1' age will cause
2410     archival of all jobs (that are not running or queued).
2411
2412     @type age: int
2413     @param age: the minimum age in seconds
2414
2415     """
2416     logging.info("Archiving jobs with age more than %s seconds", age)
2417
2418     now = time.time()
2419     end_time = now + timeout
2420     archived_count = 0
2421     last_touched = 0
2422
2423     all_job_ids = self._GetJobIDsUnlocked()
2424     pending = []
2425     for idx, job_id in enumerate(all_job_ids):
2426       last_touched = idx + 1
2427
2428       # Not optimal because jobs could be pending
2429       # TODO: Measure average duration for job archival and take number of
2430       # pending jobs into account.
2431       if time.time() > end_time:
2432         break
2433
2434       # Returns None if the job failed to load
2435       job = self._LoadJobUnlocked(job_id)
2436       if job:
2437         if job.end_timestamp is None:
2438           if job.start_timestamp is None:
2439             job_age = job.received_timestamp
2440           else:
2441             job_age = job.start_timestamp
2442         else:
2443           job_age = job.end_timestamp
2444
2445         if age == -1 or now - job_age[0] > age:
2446           pending.append(job)
2447
2448           # Archive 10 jobs at a time
2449           if len(pending) >= 10:
2450             archived_count += self._ArchiveJobsUnlocked(pending)
2451             pending = []
2452
2453     if pending:
2454       archived_count += self._ArchiveJobsUnlocked(pending)
2455
2456     return (archived_count, len(all_job_ids) - last_touched)
2457
2458   def _Query(self, fields, qfilter):
2459     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2460                        namefield="id")
2461
2462     # Archived jobs are only looked at if the "archived" field is referenced
2463     # either as a requested field or in the filter. By default archived jobs
2464     # are ignored.
2465     include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2466
2467     job_ids = qobj.RequestedNames()
2468
2469     list_all = (job_ids is None)
2470
2471     if list_all:
2472       # Since files are added to/removed from the queue atomically, there's no
2473       # risk of getting the job ids in an inconsistent state.
2474       job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2475
2476     jobs = []
2477
2478     for job_id in job_ids:
2479       job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2480       if job is not None or not list_all:
2481         jobs.append((job_id, job))
2482
2483     return (qobj, jobs, list_all)
2484
2485   def QueryJobs(self, fields, qfilter):
2486     """Returns a list of jobs in queue.
2487
2488     @type fields: sequence
2489     @param fields: List of wanted fields
2490     @type qfilter: None or query2 filter (list)
2491     @param qfilter: Query filter
2492
2493     """
2494     (qobj, ctx, _) = self._Query(fields, qfilter)
2495
2496     return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2497
2498   def OldStyleQueryJobs(self, job_ids, fields):
2499     """Returns a list of jobs in queue.
2500
2501     @type job_ids: list
2502     @param job_ids: sequence of job identifiers or None for all
2503     @type fields: list
2504     @param fields: names of fields to return
2505     @rtype: list
2506     @return: list one element per job, each element being list with
2507         the requested fields
2508
2509     """
2510     # backwards compat:
2511     job_ids = [int(jid) for jid in job_ids]
2512     qfilter = qlang.MakeSimpleFilter("id", job_ids)
2513
2514     (qobj, ctx, _) = self._Query(fields, qfilter)
2515
2516     return qobj.OldStyleQuery(ctx, sort_by_name=False)
2517
2518   @locking.ssynchronized(_LOCK)
2519   def PrepareShutdown(self):
2520     """Prepare to stop the job queue.
2521
2522     Disables execution of jobs in the workerpool and returns whether there are
2523     any jobs currently running. If the latter is the case, the job queue is not
2524     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2525     be called without interfering with any job. Queued and unfinished jobs will
2526     be resumed next time.
2527
2528     Once this function has been called no new job submissions will be accepted
2529     (see L{_RequireNonDrainedQueue}).
2530
2531     @rtype: bool
2532     @return: Whether there are any running jobs
2533
2534     """
2535     if self._accepting_jobs:
2536       self._accepting_jobs = False
2537
2538       # Tell worker pool to stop processing pending tasks
2539       self._wpool.SetActive(False)
2540
2541     return self._wpool.HasRunningTasks()
2542
2543   @locking.ssynchronized(_LOCK)
2544   @_RequireOpenQueue
2545   def Shutdown(self):
2546     """Stops the job queue.
2547
2548     This shutdowns all the worker threads an closes the queue.
2549
2550     """
2551     self._wpool.TerminateWorkers()
2552
2553     self._queue_filelock.Close()
2554     self._queue_filelock = None