Fix iallocator group objects
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38
39 try:
40   # pylint: disable=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59 from ganeti import ht
60
61
62 JOBQUEUE_THREADS = 25
63 JOBS_PER_ARCHIVE_DIRECTORY = 10000
64
65 # member lock names to be passed to @ssynchronized decorator
66 _LOCK = "_lock"
67 _QUEUE = "_queue"
68
69
70 class CancelJob(Exception):
71   """Special exception to cancel a job.
72
73   """
74
75
76 def TimeStampNow():
77   """Returns the current timestamp.
78
79   @rtype: tuple
80   @return: the current time in the (seconds, microseconds) format
81
82   """
83   return utils.SplitTime(time.time())
84
85
86 class _QueuedOpCode(object):
87   """Encapsulates an opcode object.
88
89   @ivar log: holds the execution log and consists of tuples
90   of the form C{(log_serial, timestamp, level, message)}
91   @ivar input: the OpCode we encapsulate
92   @ivar status: the current status
93   @ivar result: the result of the LU execution
94   @ivar start_timestamp: timestamp for the start of the execution
95   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
96   @ivar stop_timestamp: timestamp for the end of the execution
97
98   """
99   __slots__ = ["input", "status", "result", "log", "priority",
100                "start_timestamp", "exec_timestamp", "end_timestamp",
101                "__weakref__"]
102
103   def __init__(self, op):
104     """Constructor for the _QuededOpCode.
105
106     @type op: L{opcodes.OpCode}
107     @param op: the opcode we encapsulate
108
109     """
110     self.input = op
111     self.status = constants.OP_STATUS_QUEUED
112     self.result = None
113     self.log = []
114     self.start_timestamp = None
115     self.exec_timestamp = None
116     self.end_timestamp = None
117
118     # Get initial priority (it might change during the lifetime of this opcode)
119     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
120
121   @classmethod
122   def Restore(cls, state):
123     """Restore the _QueuedOpCode from the serialized form.
124
125     @type state: dict
126     @param state: the serialized state
127     @rtype: _QueuedOpCode
128     @return: a new _QueuedOpCode instance
129
130     """
131     obj = _QueuedOpCode.__new__(cls)
132     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
133     obj.status = state["status"]
134     obj.result = state["result"]
135     obj.log = state["log"]
136     obj.start_timestamp = state.get("start_timestamp", None)
137     obj.exec_timestamp = state.get("exec_timestamp", None)
138     obj.end_timestamp = state.get("end_timestamp", None)
139     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
140     return obj
141
142   def Serialize(self):
143     """Serializes this _QueuedOpCode.
144
145     @rtype: dict
146     @return: the dictionary holding the serialized state
147
148     """
149     return {
150       "input": self.input.__getstate__(),
151       "status": self.status,
152       "result": self.result,
153       "log": self.log,
154       "start_timestamp": self.start_timestamp,
155       "exec_timestamp": self.exec_timestamp,
156       "end_timestamp": self.end_timestamp,
157       "priority": self.priority,
158       }
159
160
161 class _QueuedJob(object):
162   """In-memory job representation.
163
164   This is what we use to track the user-submitted jobs. Locking must
165   be taken care of by users of this class.
166
167   @type queue: L{JobQueue}
168   @ivar queue: the parent queue
169   @ivar id: the job ID
170   @type ops: list
171   @ivar ops: the list of _QueuedOpCode that constitute the job
172   @type log_serial: int
173   @ivar log_serial: holds the index for the next log entry
174   @ivar received_timestamp: the timestamp for when the job was received
175   @ivar start_timestmap: the timestamp for start of execution
176   @ivar end_timestamp: the timestamp for end of execution
177   @ivar writable: Whether the job is allowed to be modified
178
179   """
180   # pylint: disable=W0212
181   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
182                "received_timestamp", "start_timestamp", "end_timestamp",
183                "__weakref__", "processor_lock", "writable"]
184
185   def __init__(self, queue, job_id, ops, writable):
186     """Constructor for the _QueuedJob.
187
188     @type queue: L{JobQueue}
189     @param queue: our parent queue
190     @type job_id: job_id
191     @param job_id: our job id
192     @type ops: list
193     @param ops: the list of opcodes we hold, which will be encapsulated
194         in _QueuedOpCodes
195     @type writable: bool
196     @param writable: Whether job can be modified
197
198     """
199     if not ops:
200       raise errors.GenericError("A job needs at least one opcode")
201
202     self.queue = queue
203     self.id = job_id
204     self.ops = [_QueuedOpCode(op) for op in ops]
205     self.log_serial = 0
206     self.received_timestamp = TimeStampNow()
207     self.start_timestamp = None
208     self.end_timestamp = None
209
210     self._InitInMemory(self, writable)
211
212   @staticmethod
213   def _InitInMemory(obj, writable):
214     """Initializes in-memory variables.
215
216     """
217     obj.writable = writable
218     obj.ops_iter = None
219     obj.cur_opctx = None
220
221     # Read-only jobs are not processed and therefore don't need a lock
222     if writable:
223       obj.processor_lock = threading.Lock()
224     else:
225       obj.processor_lock = None
226
227   def __repr__(self):
228     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
229               "id=%s" % self.id,
230               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
231
232     return "<%s at %#x>" % (" ".join(status), id(self))
233
234   @classmethod
235   def Restore(cls, queue, state, writable):
236     """Restore a _QueuedJob from serialized state:
237
238     @type queue: L{JobQueue}
239     @param queue: to which queue the restored job belongs
240     @type state: dict
241     @param state: the serialized state
242     @type writable: bool
243     @param writable: Whether job can be modified
244     @rtype: _JobQueue
245     @return: the restored _JobQueue instance
246
247     """
248     obj = _QueuedJob.__new__(cls)
249     obj.queue = queue
250     obj.id = state["id"]
251     obj.received_timestamp = state.get("received_timestamp", None)
252     obj.start_timestamp = state.get("start_timestamp", None)
253     obj.end_timestamp = state.get("end_timestamp", None)
254
255     obj.ops = []
256     obj.log_serial = 0
257     for op_state in state["ops"]:
258       op = _QueuedOpCode.Restore(op_state)
259       for log_entry in op.log:
260         obj.log_serial = max(obj.log_serial, log_entry[0])
261       obj.ops.append(op)
262
263     cls._InitInMemory(obj, writable)
264
265     return obj
266
267   def Serialize(self):
268     """Serialize the _JobQueue instance.
269
270     @rtype: dict
271     @return: the serialized state
272
273     """
274     return {
275       "id": self.id,
276       "ops": [op.Serialize() for op in self.ops],
277       "start_timestamp": self.start_timestamp,
278       "end_timestamp": self.end_timestamp,
279       "received_timestamp": self.received_timestamp,
280       }
281
282   def CalcStatus(self):
283     """Compute the status of this job.
284
285     This function iterates over all the _QueuedOpCodes in the job and
286     based on their status, computes the job status.
287
288     The algorithm is:
289       - if we find a cancelled, or finished with error, the job
290         status will be the same
291       - otherwise, the last opcode with the status one of:
292           - waitlock
293           - canceling
294           - running
295
296         will determine the job status
297
298       - otherwise, it means either all opcodes are queued, or success,
299         and the job status will be the same
300
301     @return: the job status
302
303     """
304     status = constants.JOB_STATUS_QUEUED
305
306     all_success = True
307     for op in self.ops:
308       if op.status == constants.OP_STATUS_SUCCESS:
309         continue
310
311       all_success = False
312
313       if op.status == constants.OP_STATUS_QUEUED:
314         pass
315       elif op.status == constants.OP_STATUS_WAITING:
316         status = constants.JOB_STATUS_WAITING
317       elif op.status == constants.OP_STATUS_RUNNING:
318         status = constants.JOB_STATUS_RUNNING
319       elif op.status == constants.OP_STATUS_CANCELING:
320         status = constants.JOB_STATUS_CANCELING
321         break
322       elif op.status == constants.OP_STATUS_ERROR:
323         status = constants.JOB_STATUS_ERROR
324         # The whole job fails if one opcode failed
325         break
326       elif op.status == constants.OP_STATUS_CANCELED:
327         status = constants.OP_STATUS_CANCELED
328         break
329
330     if all_success:
331       status = constants.JOB_STATUS_SUCCESS
332
333     return status
334
335   def CalcPriority(self):
336     """Gets the current priority for this job.
337
338     Only unfinished opcodes are considered. When all are done, the default
339     priority is used.
340
341     @rtype: int
342
343     """
344     priorities = [op.priority for op in self.ops
345                   if op.status not in constants.OPS_FINALIZED]
346
347     if not priorities:
348       # All opcodes are done, assume default priority
349       return constants.OP_PRIO_DEFAULT
350
351     return min(priorities)
352
353   def GetLogEntries(self, newer_than):
354     """Selectively returns the log entries.
355
356     @type newer_than: None or int
357     @param newer_than: if this is None, return all log entries,
358         otherwise return only the log entries with serial higher
359         than this value
360     @rtype: list
361     @return: the list of the log entries selected
362
363     """
364     if newer_than is None:
365       serial = -1
366     else:
367       serial = newer_than
368
369     entries = []
370     for op in self.ops:
371       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
372
373     return entries
374
375   def GetInfo(self, fields):
376     """Returns information about a job.
377
378     @type fields: list
379     @param fields: names of fields to return
380     @rtype: list
381     @return: list with one element for each field
382     @raise errors.OpExecError: when an invalid field
383         has been passed
384
385     """
386     row = []
387     for fname in fields:
388       if fname == "id":
389         row.append(self.id)
390       elif fname == "status":
391         row.append(self.CalcStatus())
392       elif fname == "priority":
393         row.append(self.CalcPriority())
394       elif fname == "ops":
395         row.append([op.input.__getstate__() for op in self.ops])
396       elif fname == "opresult":
397         row.append([op.result for op in self.ops])
398       elif fname == "opstatus":
399         row.append([op.status for op in self.ops])
400       elif fname == "oplog":
401         row.append([op.log for op in self.ops])
402       elif fname == "opstart":
403         row.append([op.start_timestamp for op in self.ops])
404       elif fname == "opexec":
405         row.append([op.exec_timestamp for op in self.ops])
406       elif fname == "opend":
407         row.append([op.end_timestamp for op in self.ops])
408       elif fname == "oppriority":
409         row.append([op.priority for op in self.ops])
410       elif fname == "received_ts":
411         row.append(self.received_timestamp)
412       elif fname == "start_ts":
413         row.append(self.start_timestamp)
414       elif fname == "end_ts":
415         row.append(self.end_timestamp)
416       elif fname == "summary":
417         row.append([op.input.Summary() for op in self.ops])
418       else:
419         raise errors.OpExecError("Invalid self query field '%s'" % fname)
420     return row
421
422   def MarkUnfinishedOps(self, status, result):
423     """Mark unfinished opcodes with a given status and result.
424
425     This is an utility function for marking all running or waiting to
426     be run opcodes with a given status. Opcodes which are already
427     finalised are not changed.
428
429     @param status: a given opcode status
430     @param result: the opcode result
431
432     """
433     not_marked = True
434     for op in self.ops:
435       if op.status in constants.OPS_FINALIZED:
436         assert not_marked, "Finalized opcodes found after non-finalized ones"
437         continue
438       op.status = status
439       op.result = result
440       not_marked = False
441
442   def Finalize(self):
443     """Marks the job as finalized.
444
445     """
446     self.end_timestamp = TimeStampNow()
447
448   def Cancel(self):
449     """Marks job as canceled/-ing if possible.
450
451     @rtype: tuple; (bool, string)
452     @return: Boolean describing whether job was successfully canceled or marked
453       as canceling and a text message
454
455     """
456     status = self.CalcStatus()
457
458     if status == constants.JOB_STATUS_QUEUED:
459       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
460                              "Job canceled by request")
461       self.Finalize()
462       return (True, "Job %s canceled" % self.id)
463
464     elif status == constants.JOB_STATUS_WAITING:
465       # The worker will notice the new status and cancel the job
466       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
467       return (True, "Job %s will be canceled" % self.id)
468
469     else:
470       logging.debug("Job %s is no longer waiting in the queue", self.id)
471       return (False, "Job %s is no longer waiting in the queue" % self.id)
472
473
474 class _OpExecCallbacks(mcpu.OpExecCbBase):
475   def __init__(self, queue, job, op):
476     """Initializes this class.
477
478     @type queue: L{JobQueue}
479     @param queue: Job queue
480     @type job: L{_QueuedJob}
481     @param job: Job object
482     @type op: L{_QueuedOpCode}
483     @param op: OpCode
484
485     """
486     assert queue, "Queue is missing"
487     assert job, "Job is missing"
488     assert op, "Opcode is missing"
489
490     self._queue = queue
491     self._job = job
492     self._op = op
493
494   def _CheckCancel(self):
495     """Raises an exception to cancel the job if asked to.
496
497     """
498     # Cancel here if we were asked to
499     if self._op.status == constants.OP_STATUS_CANCELING:
500       logging.debug("Canceling opcode")
501       raise CancelJob()
502
503   @locking.ssynchronized(_QUEUE, shared=1)
504   def NotifyStart(self):
505     """Mark the opcode as running, not lock-waiting.
506
507     This is called from the mcpu code as a notifier function, when the LU is
508     finally about to start the Exec() method. Of course, to have end-user
509     visible results, the opcode must be initially (before calling into
510     Processor.ExecOpCode) set to OP_STATUS_WAITING.
511
512     """
513     assert self._op in self._job.ops
514     assert self._op.status in (constants.OP_STATUS_WAITING,
515                                constants.OP_STATUS_CANCELING)
516
517     # Cancel here if we were asked to
518     self._CheckCancel()
519
520     logging.debug("Opcode is now running")
521
522     self._op.status = constants.OP_STATUS_RUNNING
523     self._op.exec_timestamp = TimeStampNow()
524
525     # And finally replicate the job status
526     self._queue.UpdateJobUnlocked(self._job)
527
528   @locking.ssynchronized(_QUEUE, shared=1)
529   def _AppendFeedback(self, timestamp, log_type, log_msg):
530     """Internal feedback append function, with locks
531
532     """
533     self._job.log_serial += 1
534     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
535     self._queue.UpdateJobUnlocked(self._job, replicate=False)
536
537   def Feedback(self, *args):
538     """Append a log entry.
539
540     """
541     assert len(args) < 3
542
543     if len(args) == 1:
544       log_type = constants.ELOG_MESSAGE
545       log_msg = args[0]
546     else:
547       (log_type, log_msg) = args
548
549     # The time is split to make serialization easier and not lose
550     # precision.
551     timestamp = utils.SplitTime(time.time())
552     self._AppendFeedback(timestamp, log_type, log_msg)
553
554   def CheckCancel(self):
555     """Check whether job has been cancelled.
556
557     """
558     assert self._op.status in (constants.OP_STATUS_WAITING,
559                                constants.OP_STATUS_CANCELING)
560
561     # Cancel here if we were asked to
562     self._CheckCancel()
563
564   def SubmitManyJobs(self, jobs):
565     """Submits jobs for processing.
566
567     See L{JobQueue.SubmitManyJobs}.
568
569     """
570     # Locking is done in job queue
571     return self._queue.SubmitManyJobs(jobs)
572
573
574 class _JobChangesChecker(object):
575   def __init__(self, fields, prev_job_info, prev_log_serial):
576     """Initializes this class.
577
578     @type fields: list of strings
579     @param fields: Fields requested by LUXI client
580     @type prev_job_info: string
581     @param prev_job_info: previous job info, as passed by the LUXI client
582     @type prev_log_serial: string
583     @param prev_log_serial: previous job serial, as passed by the LUXI client
584
585     """
586     self._fields = fields
587     self._prev_job_info = prev_job_info
588     self._prev_log_serial = prev_log_serial
589
590   def __call__(self, job):
591     """Checks whether job has changed.
592
593     @type job: L{_QueuedJob}
594     @param job: Job object
595
596     """
597     assert not job.writable, "Expected read-only job"
598
599     status = job.CalcStatus()
600     job_info = job.GetInfo(self._fields)
601     log_entries = job.GetLogEntries(self._prev_log_serial)
602
603     # Serializing and deserializing data can cause type changes (e.g. from
604     # tuple to list) or precision loss. We're doing it here so that we get
605     # the same modifications as the data received from the client. Without
606     # this, the comparison afterwards might fail without the data being
607     # significantly different.
608     # TODO: we just deserialized from disk, investigate how to make sure that
609     # the job info and log entries are compatible to avoid this further step.
610     # TODO: Doing something like in testutils.py:UnifyValueType might be more
611     # efficient, though floats will be tricky
612     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
613     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
614
615     # Don't even try to wait if the job is no longer running, there will be
616     # no changes.
617     if (status not in (constants.JOB_STATUS_QUEUED,
618                        constants.JOB_STATUS_RUNNING,
619                        constants.JOB_STATUS_WAITING) or
620         job_info != self._prev_job_info or
621         (log_entries and self._prev_log_serial != log_entries[0][0])):
622       logging.debug("Job %s changed", job.id)
623       return (job_info, log_entries)
624
625     return None
626
627
628 class _JobFileChangesWaiter(object):
629   def __init__(self, filename):
630     """Initializes this class.
631
632     @type filename: string
633     @param filename: Path to job file
634     @raises errors.InotifyError: if the notifier cannot be setup
635
636     """
637     self._wm = pyinotify.WatchManager()
638     self._inotify_handler = \
639       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
640     self._notifier = \
641       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
642     try:
643       self._inotify_handler.enable()
644     except Exception:
645       # pyinotify doesn't close file descriptors automatically
646       self._notifier.stop()
647       raise
648
649   def _OnInotify(self, notifier_enabled):
650     """Callback for inotify.
651
652     """
653     if not notifier_enabled:
654       self._inotify_handler.enable()
655
656   def Wait(self, timeout):
657     """Waits for the job file to change.
658
659     @type timeout: float
660     @param timeout: Timeout in seconds
661     @return: Whether there have been events
662
663     """
664     assert timeout >= 0
665     have_events = self._notifier.check_events(timeout * 1000)
666     if have_events:
667       self._notifier.read_events()
668     self._notifier.process_events()
669     return have_events
670
671   def Close(self):
672     """Closes underlying notifier and its file descriptor.
673
674     """
675     self._notifier.stop()
676
677
678 class _JobChangesWaiter(object):
679   def __init__(self, filename):
680     """Initializes this class.
681
682     @type filename: string
683     @param filename: Path to job file
684
685     """
686     self._filewaiter = None
687     self._filename = filename
688
689   def Wait(self, timeout):
690     """Waits for a job to change.
691
692     @type timeout: float
693     @param timeout: Timeout in seconds
694     @return: Whether there have been events
695
696     """
697     if self._filewaiter:
698       return self._filewaiter.Wait(timeout)
699
700     # Lazy setup: Avoid inotify setup cost when job file has already changed.
701     # If this point is reached, return immediately and let caller check the job
702     # file again in case there were changes since the last check. This avoids a
703     # race condition.
704     self._filewaiter = _JobFileChangesWaiter(self._filename)
705
706     return True
707
708   def Close(self):
709     """Closes underlying waiter.
710
711     """
712     if self._filewaiter:
713       self._filewaiter.Close()
714
715
716 class _WaitForJobChangesHelper(object):
717   """Helper class using inotify to wait for changes in a job file.
718
719   This class takes a previous job status and serial, and alerts the client when
720   the current job status has changed.
721
722   """
723   @staticmethod
724   def _CheckForChanges(counter, job_load_fn, check_fn):
725     if counter.next() > 0:
726       # If this isn't the first check the job is given some more time to change
727       # again. This gives better performance for jobs generating many
728       # changes/messages.
729       time.sleep(0.1)
730
731     job = job_load_fn()
732     if not job:
733       raise errors.JobLost()
734
735     result = check_fn(job)
736     if result is None:
737       raise utils.RetryAgain()
738
739     return result
740
741   def __call__(self, filename, job_load_fn,
742                fields, prev_job_info, prev_log_serial, timeout):
743     """Waits for changes on a job.
744
745     @type filename: string
746     @param filename: File on which to wait for changes
747     @type job_load_fn: callable
748     @param job_load_fn: Function to load job
749     @type fields: list of strings
750     @param fields: Which fields to check for changes
751     @type prev_job_info: list or None
752     @param prev_job_info: Last job information returned
753     @type prev_log_serial: int
754     @param prev_log_serial: Last job message serial number
755     @type timeout: float
756     @param timeout: maximum time to wait in seconds
757
758     """
759     counter = itertools.count()
760     try:
761       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
762       waiter = _JobChangesWaiter(filename)
763       try:
764         return utils.Retry(compat.partial(self._CheckForChanges,
765                                           counter, job_load_fn, check_fn),
766                            utils.RETRY_REMAINING_TIME, timeout,
767                            wait_fn=waiter.Wait)
768       finally:
769         waiter.Close()
770     except (errors.InotifyError, errors.JobLost):
771       return None
772     except utils.RetryTimeout:
773       return constants.JOB_NOTCHANGED
774
775
776 def _EncodeOpError(err):
777   """Encodes an error which occurred while processing an opcode.
778
779   """
780   if isinstance(err, errors.GenericError):
781     to_encode = err
782   else:
783     to_encode = errors.OpExecError(str(err))
784
785   return errors.EncodeException(to_encode)
786
787
788 class _TimeoutStrategyWrapper:
789   def __init__(self, fn):
790     """Initializes this class.
791
792     """
793     self._fn = fn
794     self._next = None
795
796   def _Advance(self):
797     """Gets the next timeout if necessary.
798
799     """
800     if self._next is None:
801       self._next = self._fn()
802
803   def Peek(self):
804     """Returns the next timeout.
805
806     """
807     self._Advance()
808     return self._next
809
810   def Next(self):
811     """Returns the current timeout and advances the internal state.
812
813     """
814     self._Advance()
815     result = self._next
816     self._next = None
817     return result
818
819
820 class _OpExecContext:
821   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
822     """Initializes this class.
823
824     """
825     self.op = op
826     self.index = index
827     self.log_prefix = log_prefix
828     self.summary = op.input.Summary()
829
830     # Create local copy to modify
831     if getattr(op.input, opcodes.DEPEND_ATTR, None):
832       self.jobdeps = op.input.depends[:]
833     else:
834       self.jobdeps = None
835
836     self._timeout_strategy_factory = timeout_strategy_factory
837     self._ResetTimeoutStrategy()
838
839   def _ResetTimeoutStrategy(self):
840     """Creates a new timeout strategy.
841
842     """
843     self._timeout_strategy = \
844       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
845
846   def CheckPriorityIncrease(self):
847     """Checks whether priority can and should be increased.
848
849     Called when locks couldn't be acquired.
850
851     """
852     op = self.op
853
854     # Exhausted all retries and next round should not use blocking acquire
855     # for locks?
856     if (self._timeout_strategy.Peek() is None and
857         op.priority > constants.OP_PRIO_HIGHEST):
858       logging.debug("Increasing priority")
859       op.priority -= 1
860       self._ResetTimeoutStrategy()
861       return True
862
863     return False
864
865   def GetNextLockTimeout(self):
866     """Returns the next lock acquire timeout.
867
868     """
869     return self._timeout_strategy.Next()
870
871
872 class _JobProcessor(object):
873   (DEFER,
874    WAITDEP,
875    FINISHED) = range(1, 4)
876
877   def __init__(self, queue, opexec_fn, job,
878                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
879     """Initializes this class.
880
881     """
882     self.queue = queue
883     self.opexec_fn = opexec_fn
884     self.job = job
885     self._timeout_strategy_factory = _timeout_strategy_factory
886
887   @staticmethod
888   def _FindNextOpcode(job, timeout_strategy_factory):
889     """Locates the next opcode to run.
890
891     @type job: L{_QueuedJob}
892     @param job: Job object
893     @param timeout_strategy_factory: Callable to create new timeout strategy
894
895     """
896     # Create some sort of a cache to speed up locating next opcode for future
897     # lookups
898     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
899     # pending and one for processed ops.
900     if job.ops_iter is None:
901       job.ops_iter = enumerate(job.ops)
902
903     # Find next opcode to run
904     while True:
905       try:
906         (idx, op) = job.ops_iter.next()
907       except StopIteration:
908         raise errors.ProgrammerError("Called for a finished job")
909
910       if op.status == constants.OP_STATUS_RUNNING:
911         # Found an opcode already marked as running
912         raise errors.ProgrammerError("Called for job marked as running")
913
914       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
915                              timeout_strategy_factory)
916
917       if op.status not in constants.OPS_FINALIZED:
918         return opctx
919
920       # This is a job that was partially completed before master daemon
921       # shutdown, so it can be expected that some opcodes are already
922       # completed successfully (if any did error out, then the whole job
923       # should have been aborted and not resubmitted for processing).
924       logging.info("%s: opcode %s already processed, skipping",
925                    opctx.log_prefix, opctx.summary)
926
927   @staticmethod
928   def _MarkWaitlock(job, op):
929     """Marks an opcode as waiting for locks.
930
931     The job's start timestamp is also set if necessary.
932
933     @type job: L{_QueuedJob}
934     @param job: Job object
935     @type op: L{_QueuedOpCode}
936     @param op: Opcode object
937
938     """
939     assert op in job.ops
940     assert op.status in (constants.OP_STATUS_QUEUED,
941                          constants.OP_STATUS_WAITING)
942
943     update = False
944
945     op.result = None
946
947     if op.status == constants.OP_STATUS_QUEUED:
948       op.status = constants.OP_STATUS_WAITING
949       update = True
950
951     if op.start_timestamp is None:
952       op.start_timestamp = TimeStampNow()
953       update = True
954
955     if job.start_timestamp is None:
956       job.start_timestamp = op.start_timestamp
957       update = True
958
959     assert op.status == constants.OP_STATUS_WAITING
960
961     return update
962
963   @staticmethod
964   def _CheckDependencies(queue, job, opctx):
965     """Checks if an opcode has dependencies and if so, processes them.
966
967     @type queue: L{JobQueue}
968     @param queue: Queue object
969     @type job: L{_QueuedJob}
970     @param job: Job object
971     @type opctx: L{_OpExecContext}
972     @param opctx: Opcode execution context
973     @rtype: bool
974     @return: Whether opcode will be re-scheduled by dependency tracker
975
976     """
977     op = opctx.op
978
979     result = False
980
981     while opctx.jobdeps:
982       (dep_job_id, dep_status) = opctx.jobdeps[0]
983
984       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
985                                                           dep_status)
986       assert ht.TNonEmptyString(depmsg), "No dependency message"
987
988       logging.info("%s: %s", opctx.log_prefix, depmsg)
989
990       if depresult == _JobDependencyManager.CONTINUE:
991         # Remove dependency and continue
992         opctx.jobdeps.pop(0)
993
994       elif depresult == _JobDependencyManager.WAIT:
995         # Need to wait for notification, dependency tracker will re-add job
996         # to workerpool
997         result = True
998         break
999
1000       elif depresult == _JobDependencyManager.CANCEL:
1001         # Job was cancelled, cancel this job as well
1002         job.Cancel()
1003         assert op.status == constants.OP_STATUS_CANCELING
1004         break
1005
1006       elif depresult in (_JobDependencyManager.WRONGSTATUS,
1007                          _JobDependencyManager.ERROR):
1008         # Job failed or there was an error, this job must fail
1009         op.status = constants.OP_STATUS_ERROR
1010         op.result = _EncodeOpError(errors.OpExecError(depmsg))
1011         break
1012
1013       else:
1014         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1015                                      depresult)
1016
1017     return result
1018
1019   def _ExecOpCodeUnlocked(self, opctx):
1020     """Processes one opcode and returns the result.
1021
1022     """
1023     op = opctx.op
1024
1025     assert op.status == constants.OP_STATUS_WAITING
1026
1027     timeout = opctx.GetNextLockTimeout()
1028
1029     try:
1030       # Make sure not to hold queue lock while calling ExecOpCode
1031       result = self.opexec_fn(op.input,
1032                               _OpExecCallbacks(self.queue, self.job, op),
1033                               timeout=timeout, priority=op.priority)
1034     except mcpu.LockAcquireTimeout:
1035       assert timeout is not None, "Received timeout for blocking acquire"
1036       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1037
1038       assert op.status in (constants.OP_STATUS_WAITING,
1039                            constants.OP_STATUS_CANCELING)
1040
1041       # Was job cancelled while we were waiting for the lock?
1042       if op.status == constants.OP_STATUS_CANCELING:
1043         return (constants.OP_STATUS_CANCELING, None)
1044
1045       # Stay in waitlock while trying to re-acquire lock
1046       return (constants.OP_STATUS_WAITING, None)
1047     except CancelJob:
1048       logging.exception("%s: Canceling job", opctx.log_prefix)
1049       assert op.status == constants.OP_STATUS_CANCELING
1050       return (constants.OP_STATUS_CANCELING, None)
1051     except Exception, err: # pylint: disable=W0703
1052       logging.exception("%s: Caught exception in %s",
1053                         opctx.log_prefix, opctx.summary)
1054       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1055     else:
1056       logging.debug("%s: %s successful",
1057                     opctx.log_prefix, opctx.summary)
1058       return (constants.OP_STATUS_SUCCESS, result)
1059
1060   def __call__(self, _nextop_fn=None):
1061     """Continues execution of a job.
1062
1063     @param _nextop_fn: Callback function for tests
1064     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1065       be deferred and C{WAITDEP} if the dependency manager
1066       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1067
1068     """
1069     queue = self.queue
1070     job = self.job
1071
1072     logging.debug("Processing job %s", job.id)
1073
1074     queue.acquire(shared=1)
1075     try:
1076       opcount = len(job.ops)
1077
1078       assert job.writable, "Expected writable job"
1079
1080       # Don't do anything for finalized jobs
1081       if job.CalcStatus() in constants.JOBS_FINALIZED:
1082         return self.FINISHED
1083
1084       # Is a previous opcode still pending?
1085       if job.cur_opctx:
1086         opctx = job.cur_opctx
1087         job.cur_opctx = None
1088       else:
1089         if __debug__ and _nextop_fn:
1090           _nextop_fn()
1091         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1092
1093       op = opctx.op
1094
1095       # Consistency check
1096       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1097                                      constants.OP_STATUS_CANCELING)
1098                         for i in job.ops[opctx.index + 1:])
1099
1100       assert op.status in (constants.OP_STATUS_QUEUED,
1101                            constants.OP_STATUS_WAITING,
1102                            constants.OP_STATUS_CANCELING)
1103
1104       assert (op.priority <= constants.OP_PRIO_LOWEST and
1105               op.priority >= constants.OP_PRIO_HIGHEST)
1106
1107       waitjob = None
1108
1109       if op.status != constants.OP_STATUS_CANCELING:
1110         assert op.status in (constants.OP_STATUS_QUEUED,
1111                              constants.OP_STATUS_WAITING)
1112
1113         # Prepare to start opcode
1114         if self._MarkWaitlock(job, op):
1115           # Write to disk
1116           queue.UpdateJobUnlocked(job)
1117
1118         assert op.status == constants.OP_STATUS_WAITING
1119         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1120         assert job.start_timestamp and op.start_timestamp
1121         assert waitjob is None
1122
1123         # Check if waiting for a job is necessary
1124         waitjob = self._CheckDependencies(queue, job, opctx)
1125
1126         assert op.status in (constants.OP_STATUS_WAITING,
1127                              constants.OP_STATUS_CANCELING,
1128                              constants.OP_STATUS_ERROR)
1129
1130         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1131                                          constants.OP_STATUS_ERROR)):
1132           logging.info("%s: opcode %s waiting for locks",
1133                        opctx.log_prefix, opctx.summary)
1134
1135           assert not opctx.jobdeps, "Not all dependencies were removed"
1136
1137           queue.release()
1138           try:
1139             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1140           finally:
1141             queue.acquire(shared=1)
1142
1143           op.status = op_status
1144           op.result = op_result
1145
1146           assert not waitjob
1147
1148         if op.status == constants.OP_STATUS_WAITING:
1149           # Couldn't get locks in time
1150           assert not op.end_timestamp
1151         else:
1152           # Finalize opcode
1153           op.end_timestamp = TimeStampNow()
1154
1155           if op.status == constants.OP_STATUS_CANCELING:
1156             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1157                                   for i in job.ops[opctx.index:])
1158           else:
1159             assert op.status in constants.OPS_FINALIZED
1160
1161       if op.status == constants.OP_STATUS_WAITING or waitjob:
1162         finalize = False
1163
1164         if not waitjob and opctx.CheckPriorityIncrease():
1165           # Priority was changed, need to update on-disk file
1166           queue.UpdateJobUnlocked(job)
1167
1168         # Keep around for another round
1169         job.cur_opctx = opctx
1170
1171         assert (op.priority <= constants.OP_PRIO_LOWEST and
1172                 op.priority >= constants.OP_PRIO_HIGHEST)
1173
1174         # In no case must the status be finalized here
1175         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1176
1177       else:
1178         # Ensure all opcodes so far have been successful
1179         assert (opctx.index == 0 or
1180                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1181                            for i in job.ops[:opctx.index]))
1182
1183         # Reset context
1184         job.cur_opctx = None
1185
1186         if op.status == constants.OP_STATUS_SUCCESS:
1187           finalize = False
1188
1189         elif op.status == constants.OP_STATUS_ERROR:
1190           # Ensure failed opcode has an exception as its result
1191           assert errors.GetEncodedError(job.ops[opctx.index].result)
1192
1193           to_encode = errors.OpExecError("Preceding opcode failed")
1194           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1195                                 _EncodeOpError(to_encode))
1196           finalize = True
1197
1198           # Consistency check
1199           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1200                             errors.GetEncodedError(i.result)
1201                             for i in job.ops[opctx.index:])
1202
1203         elif op.status == constants.OP_STATUS_CANCELING:
1204           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1205                                 "Job canceled by request")
1206           finalize = True
1207
1208         else:
1209           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1210
1211         if opctx.index == (opcount - 1):
1212           # Finalize on last opcode
1213           finalize = True
1214
1215         if finalize:
1216           # All opcodes have been run, finalize job
1217           job.Finalize()
1218
1219         # Write to disk. If the job status is final, this is the final write
1220         # allowed. Once the file has been written, it can be archived anytime.
1221         queue.UpdateJobUnlocked(job)
1222
1223         assert not waitjob
1224
1225         if finalize:
1226           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1227           return self.FINISHED
1228
1229       assert not waitjob or queue.depmgr.JobWaiting(job)
1230
1231       if waitjob:
1232         return self.WAITDEP
1233       else:
1234         return self.DEFER
1235     finally:
1236       assert job.writable, "Job became read-only while being processed"
1237       queue.release()
1238
1239
1240 def _EvaluateJobProcessorResult(depmgr, job, result):
1241   """Looks at a result from L{_JobProcessor} for a job.
1242
1243   To be used in a L{_JobQueueWorker}.
1244
1245   """
1246   if result == _JobProcessor.FINISHED:
1247     # Notify waiting jobs
1248     depmgr.NotifyWaiters(job.id)
1249
1250   elif result == _JobProcessor.DEFER:
1251     # Schedule again
1252     raise workerpool.DeferTask(priority=job.CalcPriority())
1253
1254   elif result == _JobProcessor.WAITDEP:
1255     # No-op, dependency manager will re-schedule
1256     pass
1257
1258   else:
1259     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1260                                  (result, ))
1261
1262
1263 class _JobQueueWorker(workerpool.BaseWorker):
1264   """The actual job workers.
1265
1266   """
1267   def RunTask(self, job): # pylint: disable=W0221
1268     """Job executor.
1269
1270     @type job: L{_QueuedJob}
1271     @param job: the job to be processed
1272
1273     """
1274     assert job.writable, "Expected writable job"
1275
1276     # Ensure only one worker is active on a single job. If a job registers for
1277     # a dependency job, and the other job notifies before the first worker is
1278     # done, the job can end up in the tasklist more than once.
1279     job.processor_lock.acquire()
1280     try:
1281       return self._RunTaskInner(job)
1282     finally:
1283       job.processor_lock.release()
1284
1285   def _RunTaskInner(self, job):
1286     """Executes a job.
1287
1288     Must be called with per-job lock acquired.
1289
1290     """
1291     queue = job.queue
1292     assert queue == self.pool.queue
1293
1294     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1295     setname_fn(None)
1296
1297     proc = mcpu.Processor(queue.context, job.id)
1298
1299     # Create wrapper for setting thread name
1300     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1301                                     proc.ExecOpCode)
1302
1303     _EvaluateJobProcessorResult(queue.depmgr, job,
1304                                 _JobProcessor(queue, wrap_execop_fn, job)())
1305
1306   @staticmethod
1307   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1308     """Updates the worker thread name to include a short summary of the opcode.
1309
1310     @param setname_fn: Callable setting worker thread name
1311     @param execop_fn: Callable for executing opcode (usually
1312                       L{mcpu.Processor.ExecOpCode})
1313
1314     """
1315     setname_fn(op)
1316     try:
1317       return execop_fn(op, *args, **kwargs)
1318     finally:
1319       setname_fn(None)
1320
1321   @staticmethod
1322   def _GetWorkerName(job, op):
1323     """Sets the worker thread name.
1324
1325     @type job: L{_QueuedJob}
1326     @type op: L{opcodes.OpCode}
1327
1328     """
1329     parts = ["Job%s" % job.id]
1330
1331     if op:
1332       parts.append(op.TinySummary())
1333
1334     return "/".join(parts)
1335
1336
1337 class _JobQueueWorkerPool(workerpool.WorkerPool):
1338   """Simple class implementing a job-processing workerpool.
1339
1340   """
1341   def __init__(self, queue):
1342     super(_JobQueueWorkerPool, self).__init__("Jq",
1343                                               JOBQUEUE_THREADS,
1344                                               _JobQueueWorker)
1345     self.queue = queue
1346
1347
1348 class _JobDependencyManager:
1349   """Keeps track of job dependencies.
1350
1351   """
1352   (WAIT,
1353    ERROR,
1354    CANCEL,
1355    CONTINUE,
1356    WRONGSTATUS) = range(1, 6)
1357
1358   def __init__(self, getstatus_fn, enqueue_fn):
1359     """Initializes this class.
1360
1361     """
1362     self._getstatus_fn = getstatus_fn
1363     self._enqueue_fn = enqueue_fn
1364
1365     self._waiters = {}
1366     self._lock = locking.SharedLock("JobDepMgr")
1367
1368   @locking.ssynchronized(_LOCK, shared=1)
1369   def GetLockInfo(self, requested): # pylint: disable=W0613
1370     """Retrieves information about waiting jobs.
1371
1372     @type requested: set
1373     @param requested: Requested information, see C{query.LQ_*}
1374
1375     """
1376     # No need to sort here, that's being done by the lock manager and query
1377     # library. There are no priorities for notifying jobs, hence all show up as
1378     # one item under "pending".
1379     return [("job/%s" % job_id, None, None,
1380              [("job", [job.id for job in waiters])])
1381             for job_id, waiters in self._waiters.items()
1382             if waiters]
1383
1384   @locking.ssynchronized(_LOCK, shared=1)
1385   def JobWaiting(self, job):
1386     """Checks if a job is waiting.
1387
1388     """
1389     return compat.any(job in jobs
1390                       for jobs in self._waiters.values())
1391
1392   @locking.ssynchronized(_LOCK)
1393   def CheckAndRegister(self, job, dep_job_id, dep_status):
1394     """Checks if a dependency job has the requested status.
1395
1396     If the other job is not yet in a finalized status, the calling job will be
1397     notified (re-added to the workerpool) at a later point.
1398
1399     @type job: L{_QueuedJob}
1400     @param job: Job object
1401     @type dep_job_id: string
1402     @param dep_job_id: ID of dependency job
1403     @type dep_status: list
1404     @param dep_status: Required status
1405
1406     """
1407     assert ht.TString(job.id)
1408     assert ht.TString(dep_job_id)
1409     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1410
1411     if job.id == dep_job_id:
1412       return (self.ERROR, "Job can't depend on itself")
1413
1414     # Get status of dependency job
1415     try:
1416       status = self._getstatus_fn(dep_job_id)
1417     except errors.JobLost, err:
1418       return (self.ERROR, "Dependency error: %s" % err)
1419
1420     assert status in constants.JOB_STATUS_ALL
1421
1422     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1423
1424     if status not in constants.JOBS_FINALIZED:
1425       # Register for notification and wait for job to finish
1426       job_id_waiters.add(job)
1427       return (self.WAIT,
1428               "Need to wait for job %s, wanted status '%s'" %
1429               (dep_job_id, dep_status))
1430
1431     # Remove from waiters list
1432     if job in job_id_waiters:
1433       job_id_waiters.remove(job)
1434
1435     if (status == constants.JOB_STATUS_CANCELED and
1436         constants.JOB_STATUS_CANCELED not in dep_status):
1437       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1438
1439     elif not dep_status or status in dep_status:
1440       return (self.CONTINUE,
1441               "Dependency job %s finished with status '%s'" %
1442               (dep_job_id, status))
1443
1444     else:
1445       return (self.WRONGSTATUS,
1446               "Dependency job %s finished with status '%s',"
1447               " not one of '%s' as required" %
1448               (dep_job_id, status, utils.CommaJoin(dep_status)))
1449
1450   def _RemoveEmptyWaitersUnlocked(self):
1451     """Remove all jobs without actual waiters.
1452
1453     """
1454     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1455                    if not waiters]:
1456       del self._waiters[job_id]
1457
1458   def NotifyWaiters(self, job_id):
1459     """Notifies all jobs waiting for a certain job ID.
1460
1461     @attention: Do not call until L{CheckAndRegister} returned a status other
1462       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1463     @type job_id: string
1464     @param job_id: Job ID
1465
1466     """
1467     assert ht.TString(job_id)
1468
1469     self._lock.acquire()
1470     try:
1471       self._RemoveEmptyWaitersUnlocked()
1472
1473       jobs = self._waiters.pop(job_id, None)
1474     finally:
1475       self._lock.release()
1476
1477     if jobs:
1478       # Re-add jobs to workerpool
1479       logging.debug("Re-adding %s jobs which were waiting for job %s",
1480                     len(jobs), job_id)
1481       self._enqueue_fn(jobs)
1482
1483
1484 def _RequireOpenQueue(fn):
1485   """Decorator for "public" functions.
1486
1487   This function should be used for all 'public' functions. That is,
1488   functions usually called from other classes. Note that this should
1489   be applied only to methods (not plain functions), since it expects
1490   that the decorated function is called with a first argument that has
1491   a '_queue_filelock' argument.
1492
1493   @warning: Use this decorator only after locking.ssynchronized
1494
1495   Example::
1496     @locking.ssynchronized(_LOCK)
1497     @_RequireOpenQueue
1498     def Example(self):
1499       pass
1500
1501   """
1502   def wrapper(self, *args, **kwargs):
1503     # pylint: disable=W0212
1504     assert self._queue_filelock is not None, "Queue should be open"
1505     return fn(self, *args, **kwargs)
1506   return wrapper
1507
1508
1509 def _RequireNonDrainedQueue(fn):
1510   """Decorator checking for a non-drained queue.
1511
1512   To be used with functions submitting new jobs.
1513
1514   """
1515   def wrapper(self, *args, **kwargs):
1516     """Wrapper function.
1517
1518     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1519
1520     """
1521     # Ok when sharing the big job queue lock, as the drain file is created when
1522     # the lock is exclusive.
1523     # Needs access to protected member, pylint: disable=W0212
1524     if self._drained:
1525       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1526
1527     if not self._accepting_jobs:
1528       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1529
1530     return fn(self, *args, **kwargs)
1531   return wrapper
1532
1533
1534 class JobQueue(object):
1535   """Queue used to manage the jobs.
1536
1537   """
1538   def __init__(self, context):
1539     """Constructor for JobQueue.
1540
1541     The constructor will initialize the job queue object and then
1542     start loading the current jobs from disk, either for starting them
1543     (if they were queue) or for aborting them (if they were already
1544     running).
1545
1546     @type context: GanetiContext
1547     @param context: the context object for access to the configuration
1548         data and other ganeti objects
1549
1550     """
1551     self.context = context
1552     self._memcache = weakref.WeakValueDictionary()
1553     self._my_hostname = netutils.Hostname.GetSysName()
1554
1555     # The Big JobQueue lock. If a code block or method acquires it in shared
1556     # mode safe it must guarantee concurrency with all the code acquiring it in
1557     # shared mode, including itself. In order not to acquire it at all
1558     # concurrency must be guaranteed with all code acquiring it in shared mode
1559     # and all code acquiring it exclusively.
1560     self._lock = locking.SharedLock("JobQueue")
1561
1562     self.acquire = self._lock.acquire
1563     self.release = self._lock.release
1564
1565     # Accept jobs by default
1566     self._accepting_jobs = True
1567
1568     # Initialize the queue, and acquire the filelock.
1569     # This ensures no other process is working on the job queue.
1570     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1571
1572     # Read serial file
1573     self._last_serial = jstore.ReadSerial()
1574     assert self._last_serial is not None, ("Serial file was modified between"
1575                                            " check in jstore and here")
1576
1577     # Get initial list of nodes
1578     self._nodes = dict((n.name, n.primary_ip)
1579                        for n in self.context.cfg.GetAllNodesInfo().values()
1580                        if n.master_candidate)
1581
1582     # Remove master node
1583     self._nodes.pop(self._my_hostname, None)
1584
1585     # TODO: Check consistency across nodes
1586
1587     self._queue_size = None
1588     self._UpdateQueueSizeUnlocked()
1589     assert ht.TInt(self._queue_size)
1590     self._drained = jstore.CheckDrainFlag()
1591
1592     # Job dependencies
1593     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1594                                         self._EnqueueJobs)
1595     self.context.glm.AddToLockMonitor(self.depmgr)
1596
1597     # Setup worker pool
1598     self._wpool = _JobQueueWorkerPool(self)
1599     try:
1600       self._InspectQueue()
1601     except:
1602       self._wpool.TerminateWorkers()
1603       raise
1604
1605   @locking.ssynchronized(_LOCK)
1606   @_RequireOpenQueue
1607   def _InspectQueue(self):
1608     """Loads the whole job queue and resumes unfinished jobs.
1609
1610     This function needs the lock here because WorkerPool.AddTask() may start a
1611     job while we're still doing our work.
1612
1613     """
1614     logging.info("Inspecting job queue")
1615
1616     restartjobs = []
1617
1618     all_job_ids = self._GetJobIDsUnlocked()
1619     jobs_count = len(all_job_ids)
1620     lastinfo = time.time()
1621     for idx, job_id in enumerate(all_job_ids):
1622       # Give an update every 1000 jobs or 10 seconds
1623       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1624           idx == (jobs_count - 1)):
1625         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1626                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1627         lastinfo = time.time()
1628
1629       job = self._LoadJobUnlocked(job_id)
1630
1631       # a failure in loading the job can cause 'None' to be returned
1632       if job is None:
1633         continue
1634
1635       status = job.CalcStatus()
1636
1637       if status == constants.JOB_STATUS_QUEUED:
1638         restartjobs.append(job)
1639
1640       elif status in (constants.JOB_STATUS_RUNNING,
1641                       constants.JOB_STATUS_WAITING,
1642                       constants.JOB_STATUS_CANCELING):
1643         logging.warning("Unfinished job %s found: %s", job.id, job)
1644
1645         if status == constants.JOB_STATUS_WAITING:
1646           # Restart job
1647           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1648           restartjobs.append(job)
1649         else:
1650           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1651                                 "Unclean master daemon shutdown")
1652           job.Finalize()
1653
1654         self.UpdateJobUnlocked(job)
1655
1656     if restartjobs:
1657       logging.info("Restarting %s jobs", len(restartjobs))
1658       self._EnqueueJobsUnlocked(restartjobs)
1659
1660     logging.info("Job queue inspection finished")
1661
1662   def _GetRpc(self, address_list):
1663     """Gets RPC runner with context.
1664
1665     """
1666     return rpc.JobQueueRunner(self.context, address_list)
1667
1668   @locking.ssynchronized(_LOCK)
1669   @_RequireOpenQueue
1670   def AddNode(self, node):
1671     """Register a new node with the queue.
1672
1673     @type node: L{objects.Node}
1674     @param node: the node object to be added
1675
1676     """
1677     node_name = node.name
1678     assert node_name != self._my_hostname
1679
1680     # Clean queue directory on added node
1681     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1682     msg = result.fail_msg
1683     if msg:
1684       logging.warning("Cannot cleanup queue directory on node %s: %s",
1685                       node_name, msg)
1686
1687     if not node.master_candidate:
1688       # remove if existing, ignoring errors
1689       self._nodes.pop(node_name, None)
1690       # and skip the replication of the job ids
1691       return
1692
1693     # Upload the whole queue excluding archived jobs
1694     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1695
1696     # Upload current serial file
1697     files.append(constants.JOB_QUEUE_SERIAL_FILE)
1698
1699     # Static address list
1700     addrs = [node.primary_ip]
1701
1702     for file_name in files:
1703       # Read file content
1704       content = utils.ReadFile(file_name)
1705
1706       result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1707                                                         content)
1708       msg = result[node_name].fail_msg
1709       if msg:
1710         logging.error("Failed to upload file %s to node %s: %s",
1711                       file_name, node_name, msg)
1712
1713     self._nodes[node_name] = node.primary_ip
1714
1715   @locking.ssynchronized(_LOCK)
1716   @_RequireOpenQueue
1717   def RemoveNode(self, node_name):
1718     """Callback called when removing nodes from the cluster.
1719
1720     @type node_name: str
1721     @param node_name: the name of the node to remove
1722
1723     """
1724     self._nodes.pop(node_name, None)
1725
1726   @staticmethod
1727   def _CheckRpcResult(result, nodes, failmsg):
1728     """Verifies the status of an RPC call.
1729
1730     Since we aim to keep consistency should this node (the current
1731     master) fail, we will log errors if our rpc fail, and especially
1732     log the case when more than half of the nodes fails.
1733
1734     @param result: the data as returned from the rpc call
1735     @type nodes: list
1736     @param nodes: the list of nodes we made the call to
1737     @type failmsg: str
1738     @param failmsg: the identifier to be used for logging
1739
1740     """
1741     failed = []
1742     success = []
1743
1744     for node in nodes:
1745       msg = result[node].fail_msg
1746       if msg:
1747         failed.append(node)
1748         logging.error("RPC call %s (%s) failed on node %s: %s",
1749                       result[node].call, failmsg, node, msg)
1750       else:
1751         success.append(node)
1752
1753     # +1 for the master node
1754     if (len(success) + 1) < len(failed):
1755       # TODO: Handle failing nodes
1756       logging.error("More than half of the nodes failed")
1757
1758   def _GetNodeIp(self):
1759     """Helper for returning the node name/ip list.
1760
1761     @rtype: (list, list)
1762     @return: a tuple of two lists, the first one with the node
1763         names and the second one with the node addresses
1764
1765     """
1766     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1767     name_list = self._nodes.keys()
1768     addr_list = [self._nodes[name] for name in name_list]
1769     return name_list, addr_list
1770
1771   def _UpdateJobQueueFile(self, file_name, data, replicate):
1772     """Writes a file locally and then replicates it to all nodes.
1773
1774     This function will replace the contents of a file on the local
1775     node and then replicate it to all the other nodes we have.
1776
1777     @type file_name: str
1778     @param file_name: the path of the file to be replicated
1779     @type data: str
1780     @param data: the new contents of the file
1781     @type replicate: boolean
1782     @param replicate: whether to spread the changes to the remote nodes
1783
1784     """
1785     getents = runtime.GetEnts()
1786     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1787                     gid=getents.masterd_gid)
1788
1789     if replicate:
1790       names, addrs = self._GetNodeIp()
1791       result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1792       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1793
1794   def _RenameFilesUnlocked(self, rename):
1795     """Renames a file locally and then replicate the change.
1796
1797     This function will rename a file in the local queue directory
1798     and then replicate this rename to all the other nodes we have.
1799
1800     @type rename: list of (old, new)
1801     @param rename: List containing tuples mapping old to new names
1802
1803     """
1804     # Rename them locally
1805     for old, new in rename:
1806       utils.RenameFile(old, new, mkdir=True)
1807
1808     # ... and on all nodes
1809     names, addrs = self._GetNodeIp()
1810     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1811     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1812
1813   @staticmethod
1814   def _FormatJobID(job_id):
1815     """Convert a job ID to string format.
1816
1817     Currently this just does C{str(job_id)} after performing some
1818     checks, but if we want to change the job id format this will
1819     abstract this change.
1820
1821     @type job_id: int or long
1822     @param job_id: the numeric job id
1823     @rtype: str
1824     @return: the formatted job id
1825
1826     """
1827     if not isinstance(job_id, (int, long)):
1828       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1829     if job_id < 0:
1830       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1831
1832     return str(job_id)
1833
1834   @classmethod
1835   def _GetArchiveDirectory(cls, job_id):
1836     """Returns the archive directory for a job.
1837
1838     @type job_id: str
1839     @param job_id: Job identifier
1840     @rtype: str
1841     @return: Directory name
1842
1843     """
1844     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1845
1846   def _NewSerialsUnlocked(self, count):
1847     """Generates a new job identifier.
1848
1849     Job identifiers are unique during the lifetime of a cluster.
1850
1851     @type count: integer
1852     @param count: how many serials to return
1853     @rtype: str
1854     @return: a string representing the job identifier.
1855
1856     """
1857     assert ht.TPositiveInt(count)
1858
1859     # New number
1860     serial = self._last_serial + count
1861
1862     # Write to file
1863     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1864                              "%s\n" % serial, True)
1865
1866     result = [self._FormatJobID(v)
1867               for v in range(self._last_serial + 1, serial + 1)]
1868
1869     # Keep it only if we were able to write the file
1870     self._last_serial = serial
1871
1872     assert len(result) == count
1873
1874     return result
1875
1876   @staticmethod
1877   def _GetJobPath(job_id):
1878     """Returns the job file for a given job id.
1879
1880     @type job_id: str
1881     @param job_id: the job identifier
1882     @rtype: str
1883     @return: the path to the job file
1884
1885     """
1886     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1887
1888   @classmethod
1889   def _GetArchivedJobPath(cls, job_id):
1890     """Returns the archived job file for a give job id.
1891
1892     @type job_id: str
1893     @param job_id: the job identifier
1894     @rtype: str
1895     @return: the path to the archived job file
1896
1897     """
1898     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1899                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1900
1901   @staticmethod
1902   def _GetJobIDsUnlocked(sort=True):
1903     """Return all known job IDs.
1904
1905     The method only looks at disk because it's a requirement that all
1906     jobs are present on disk (so in the _memcache we don't have any
1907     extra IDs).
1908
1909     @type sort: boolean
1910     @param sort: perform sorting on the returned job ids
1911     @rtype: list
1912     @return: the list of job IDs
1913
1914     """
1915     jlist = []
1916     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1917       m = constants.JOB_FILE_RE.match(filename)
1918       if m:
1919         jlist.append(m.group(1))
1920     if sort:
1921       jlist = utils.NiceSort(jlist)
1922     return jlist
1923
1924   def _LoadJobUnlocked(self, job_id):
1925     """Loads a job from the disk or memory.
1926
1927     Given a job id, this will return the cached job object if
1928     existing, or try to load the job from the disk. If loading from
1929     disk, it will also add the job to the cache.
1930
1931     @param job_id: the job id
1932     @rtype: L{_QueuedJob} or None
1933     @return: either None or the job object
1934
1935     """
1936     job = self._memcache.get(job_id, None)
1937     if job:
1938       logging.debug("Found job %s in memcache", job_id)
1939       assert job.writable, "Found read-only job in memcache"
1940       return job
1941
1942     try:
1943       job = self._LoadJobFromDisk(job_id, False)
1944       if job is None:
1945         return job
1946     except errors.JobFileCorrupted:
1947       old_path = self._GetJobPath(job_id)
1948       new_path = self._GetArchivedJobPath(job_id)
1949       if old_path == new_path:
1950         # job already archived (future case)
1951         logging.exception("Can't parse job %s", job_id)
1952       else:
1953         # non-archived case
1954         logging.exception("Can't parse job %s, will archive.", job_id)
1955         self._RenameFilesUnlocked([(old_path, new_path)])
1956       return None
1957
1958     assert job.writable, "Job just loaded is not writable"
1959
1960     self._memcache[job_id] = job
1961     logging.debug("Added job %s to the cache", job_id)
1962     return job
1963
1964   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1965     """Load the given job file from disk.
1966
1967     Given a job file, read, load and restore it in a _QueuedJob format.
1968
1969     @type job_id: string
1970     @param job_id: job identifier
1971     @type try_archived: bool
1972     @param try_archived: Whether to try loading an archived job
1973     @rtype: L{_QueuedJob} or None
1974     @return: either None or the job object
1975
1976     """
1977     path_functions = [(self._GetJobPath, True)]
1978
1979     if try_archived:
1980       path_functions.append((self._GetArchivedJobPath, False))
1981
1982     raw_data = None
1983     writable_default = None
1984
1985     for (fn, writable_default) in path_functions:
1986       filepath = fn(job_id)
1987       logging.debug("Loading job from %s", filepath)
1988       try:
1989         raw_data = utils.ReadFile(filepath)
1990       except EnvironmentError, err:
1991         if err.errno != errno.ENOENT:
1992           raise
1993       else:
1994         break
1995
1996     if not raw_data:
1997       return None
1998
1999     if writable is None:
2000       writable = writable_default
2001
2002     try:
2003       data = serializer.LoadJson(raw_data)
2004       job = _QueuedJob.Restore(self, data, writable)
2005     except Exception, err: # pylint: disable=W0703
2006       raise errors.JobFileCorrupted(err)
2007
2008     return job
2009
2010   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2011     """Load the given job file from disk.
2012
2013     Given a job file, read, load and restore it in a _QueuedJob format.
2014     In case of error reading the job, it gets returned as None, and the
2015     exception is logged.
2016
2017     @type job_id: string
2018     @param job_id: job identifier
2019     @type try_archived: bool
2020     @param try_archived: Whether to try loading an archived job
2021     @rtype: L{_QueuedJob} or None
2022     @return: either None or the job object
2023
2024     """
2025     try:
2026       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2027     except (errors.JobFileCorrupted, EnvironmentError):
2028       logging.exception("Can't load/parse job %s", job_id)
2029       return None
2030
2031   def _UpdateQueueSizeUnlocked(self):
2032     """Update the queue size.
2033
2034     """
2035     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2036
2037   @locking.ssynchronized(_LOCK)
2038   @_RequireOpenQueue
2039   def SetDrainFlag(self, drain_flag):
2040     """Sets the drain flag for the queue.
2041
2042     @type drain_flag: boolean
2043     @param drain_flag: Whether to set or unset the drain flag
2044
2045     """
2046     jstore.SetDrainFlag(drain_flag)
2047
2048     self._drained = drain_flag
2049
2050     return True
2051
2052   @_RequireOpenQueue
2053   def _SubmitJobUnlocked(self, job_id, ops):
2054     """Create and store a new job.
2055
2056     This enters the job into our job queue and also puts it on the new
2057     queue, in order for it to be picked up by the queue processors.
2058
2059     @type job_id: job ID
2060     @param job_id: the job ID for the new job
2061     @type ops: list
2062     @param ops: The list of OpCodes that will become the new job.
2063     @rtype: L{_QueuedJob}
2064     @return: the job object to be queued
2065     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2066     @raise errors.GenericError: If an opcode is not valid
2067
2068     """
2069     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2070       raise errors.JobQueueFull()
2071
2072     job = _QueuedJob(self, job_id, ops, True)
2073
2074     # Check priority
2075     for idx, op in enumerate(job.ops):
2076       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2077         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2078         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2079                                   " are %s" % (idx, op.priority, allowed))
2080
2081       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2082       if not opcodes.TNoRelativeJobDependencies(dependencies):
2083         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2084                                   " match %s: %s" %
2085                                   (idx, opcodes.TNoRelativeJobDependencies,
2086                                    dependencies))
2087
2088     # Write to disk
2089     self.UpdateJobUnlocked(job)
2090
2091     self._queue_size += 1
2092
2093     logging.debug("Adding new job %s to the cache", job_id)
2094     self._memcache[job_id] = job
2095
2096     return job
2097
2098   @locking.ssynchronized(_LOCK)
2099   @_RequireOpenQueue
2100   @_RequireNonDrainedQueue
2101   def SubmitJob(self, ops):
2102     """Create and store a new job.
2103
2104     @see: L{_SubmitJobUnlocked}
2105
2106     """
2107     (job_id, ) = self._NewSerialsUnlocked(1)
2108     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2109     return job_id
2110
2111   @locking.ssynchronized(_LOCK)
2112   @_RequireOpenQueue
2113   @_RequireNonDrainedQueue
2114   def SubmitManyJobs(self, jobs):
2115     """Create and store multiple jobs.
2116
2117     @see: L{_SubmitJobUnlocked}
2118
2119     """
2120     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2121
2122     (results, added_jobs) = \
2123       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2124
2125     self._EnqueueJobsUnlocked(added_jobs)
2126
2127     return results
2128
2129   @staticmethod
2130   def _FormatSubmitError(msg, ops):
2131     """Formats errors which occurred while submitting a job.
2132
2133     """
2134     return ("%s; opcodes %s" %
2135             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2136
2137   @staticmethod
2138   def _ResolveJobDependencies(resolve_fn, deps):
2139     """Resolves relative job IDs in dependencies.
2140
2141     @type resolve_fn: callable
2142     @param resolve_fn: Function to resolve a relative job ID
2143     @type deps: list
2144     @param deps: Dependencies
2145     @rtype: list
2146     @return: Resolved dependencies
2147
2148     """
2149     result = []
2150
2151     for (dep_job_id, dep_status) in deps:
2152       if ht.TRelativeJobId(dep_job_id):
2153         assert ht.TInt(dep_job_id) and dep_job_id < 0
2154         try:
2155           job_id = resolve_fn(dep_job_id)
2156         except IndexError:
2157           # Abort
2158           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2159       else:
2160         job_id = dep_job_id
2161
2162       result.append((job_id, dep_status))
2163
2164     return (True, result)
2165
2166   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2167     """Create and store multiple jobs.
2168
2169     @see: L{_SubmitJobUnlocked}
2170
2171     """
2172     results = []
2173     added_jobs = []
2174
2175     def resolve_fn(job_idx, reljobid):
2176       assert reljobid < 0
2177       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2178
2179     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2180       for op in ops:
2181         if getattr(op, opcodes.DEPEND_ATTR, None):
2182           (status, data) = \
2183             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2184                                          op.depends)
2185           if not status:
2186             # Abort resolving dependencies
2187             assert ht.TNonEmptyString(data), "No error message"
2188             break
2189           # Use resolved dependencies
2190           op.depends = data
2191       else:
2192         try:
2193           job = self._SubmitJobUnlocked(job_id, ops)
2194         except errors.GenericError, err:
2195           status = False
2196           data = self._FormatSubmitError(str(err), ops)
2197         else:
2198           status = True
2199           data = job_id
2200           added_jobs.append(job)
2201
2202       results.append((status, data))
2203
2204     return (results, added_jobs)
2205
2206   @locking.ssynchronized(_LOCK)
2207   def _EnqueueJobs(self, jobs):
2208     """Helper function to add jobs to worker pool's queue.
2209
2210     @type jobs: list
2211     @param jobs: List of all jobs
2212
2213     """
2214     return self._EnqueueJobsUnlocked(jobs)
2215
2216   def _EnqueueJobsUnlocked(self, jobs):
2217     """Helper function to add jobs to worker pool's queue.
2218
2219     @type jobs: list
2220     @param jobs: List of all jobs
2221
2222     """
2223     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2224     self._wpool.AddManyTasks([(job, ) for job in jobs],
2225                              priority=[job.CalcPriority() for job in jobs])
2226
2227   def _GetJobStatusForDependencies(self, job_id):
2228     """Gets the status of a job for dependencies.
2229
2230     @type job_id: string
2231     @param job_id: Job ID
2232     @raise errors.JobLost: If job can't be found
2233
2234     """
2235     if not isinstance(job_id, basestring):
2236       job_id = self._FormatJobID(job_id)
2237
2238     # Not using in-memory cache as doing so would require an exclusive lock
2239
2240     # Try to load from disk
2241     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2242
2243     assert not job.writable, "Got writable job" # pylint: disable=E1101
2244
2245     if job:
2246       return job.CalcStatus()
2247
2248     raise errors.JobLost("Job %s not found" % job_id)
2249
2250   @_RequireOpenQueue
2251   def UpdateJobUnlocked(self, job, replicate=True):
2252     """Update a job's on disk storage.
2253
2254     After a job has been modified, this function needs to be called in
2255     order to write the changes to disk and replicate them to the other
2256     nodes.
2257
2258     @type job: L{_QueuedJob}
2259     @param job: the changed job
2260     @type replicate: boolean
2261     @param replicate: whether to replicate the change to remote nodes
2262
2263     """
2264     if __debug__:
2265       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2266       assert (finalized ^ (job.end_timestamp is None))
2267       assert job.writable, "Can't update read-only job"
2268
2269     filename = self._GetJobPath(job.id)
2270     data = serializer.DumpJson(job.Serialize())
2271     logging.debug("Writing job %s to %s", job.id, filename)
2272     self._UpdateJobQueueFile(filename, data, replicate)
2273
2274   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2275                         timeout):
2276     """Waits for changes in a job.
2277
2278     @type job_id: string
2279     @param job_id: Job identifier
2280     @type fields: list of strings
2281     @param fields: Which fields to check for changes
2282     @type prev_job_info: list or None
2283     @param prev_job_info: Last job information returned
2284     @type prev_log_serial: int
2285     @param prev_log_serial: Last job message serial number
2286     @type timeout: float
2287     @param timeout: maximum time to wait in seconds
2288     @rtype: tuple (job info, log entries)
2289     @return: a tuple of the job information as required via
2290         the fields parameter, and the log entries as a list
2291
2292         if the job has not changed and the timeout has expired,
2293         we instead return a special value,
2294         L{constants.JOB_NOTCHANGED}, which should be interpreted
2295         as such by the clients
2296
2297     """
2298     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2299                              writable=False)
2300
2301     helper = _WaitForJobChangesHelper()
2302
2303     return helper(self._GetJobPath(job_id), load_fn,
2304                   fields, prev_job_info, prev_log_serial, timeout)
2305
2306   @locking.ssynchronized(_LOCK)
2307   @_RequireOpenQueue
2308   def CancelJob(self, job_id):
2309     """Cancels a job.
2310
2311     This will only succeed if the job has not started yet.
2312
2313     @type job_id: string
2314     @param job_id: job ID of job to be cancelled.
2315
2316     """
2317     logging.info("Cancelling job %s", job_id)
2318
2319     job = self._LoadJobUnlocked(job_id)
2320     if not job:
2321       logging.debug("Job %s not found", job_id)
2322       return (False, "Job %s not found" % job_id)
2323
2324     assert job.writable, "Can't cancel read-only job"
2325
2326     (success, msg) = job.Cancel()
2327
2328     if success:
2329       # If the job was finalized (e.g. cancelled), this is the final write
2330       # allowed. The job can be archived anytime.
2331       self.UpdateJobUnlocked(job)
2332
2333     return (success, msg)
2334
2335   @_RequireOpenQueue
2336   def _ArchiveJobsUnlocked(self, jobs):
2337     """Archives jobs.
2338
2339     @type jobs: list of L{_QueuedJob}
2340     @param jobs: Job objects
2341     @rtype: int
2342     @return: Number of archived jobs
2343
2344     """
2345     archive_jobs = []
2346     rename_files = []
2347     for job in jobs:
2348       assert job.writable, "Can't archive read-only job"
2349
2350       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2351         logging.debug("Job %s is not yet done", job.id)
2352         continue
2353
2354       archive_jobs.append(job)
2355
2356       old = self._GetJobPath(job.id)
2357       new = self._GetArchivedJobPath(job.id)
2358       rename_files.append((old, new))
2359
2360     # TODO: What if 1..n files fail to rename?
2361     self._RenameFilesUnlocked(rename_files)
2362
2363     logging.debug("Successfully archived job(s) %s",
2364                   utils.CommaJoin(job.id for job in archive_jobs))
2365
2366     # Since we haven't quite checked, above, if we succeeded or failed renaming
2367     # the files, we update the cached queue size from the filesystem. When we
2368     # get around to fix the TODO: above, we can use the number of actually
2369     # archived jobs to fix this.
2370     self._UpdateQueueSizeUnlocked()
2371     return len(archive_jobs)
2372
2373   @locking.ssynchronized(_LOCK)
2374   @_RequireOpenQueue
2375   def ArchiveJob(self, job_id):
2376     """Archives a job.
2377
2378     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2379
2380     @type job_id: string
2381     @param job_id: Job ID of job to be archived.
2382     @rtype: bool
2383     @return: Whether job was archived
2384
2385     """
2386     logging.info("Archiving job %s", job_id)
2387
2388     job = self._LoadJobUnlocked(job_id)
2389     if not job:
2390       logging.debug("Job %s not found", job_id)
2391       return False
2392
2393     return self._ArchiveJobsUnlocked([job]) == 1
2394
2395   @locking.ssynchronized(_LOCK)
2396   @_RequireOpenQueue
2397   def AutoArchiveJobs(self, age, timeout):
2398     """Archives all jobs based on age.
2399
2400     The method will archive all jobs which are older than the age
2401     parameter. For jobs that don't have an end timestamp, the start
2402     timestamp will be considered. The special '-1' age will cause
2403     archival of all jobs (that are not running or queued).
2404
2405     @type age: int
2406     @param age: the minimum age in seconds
2407
2408     """
2409     logging.info("Archiving jobs with age more than %s seconds", age)
2410
2411     now = time.time()
2412     end_time = now + timeout
2413     archived_count = 0
2414     last_touched = 0
2415
2416     all_job_ids = self._GetJobIDsUnlocked()
2417     pending = []
2418     for idx, job_id in enumerate(all_job_ids):
2419       last_touched = idx + 1
2420
2421       # Not optimal because jobs could be pending
2422       # TODO: Measure average duration for job archival and take number of
2423       # pending jobs into account.
2424       if time.time() > end_time:
2425         break
2426
2427       # Returns None if the job failed to load
2428       job = self._LoadJobUnlocked(job_id)
2429       if job:
2430         if job.end_timestamp is None:
2431           if job.start_timestamp is None:
2432             job_age = job.received_timestamp
2433           else:
2434             job_age = job.start_timestamp
2435         else:
2436           job_age = job.end_timestamp
2437
2438         if age == -1 or now - job_age[0] > age:
2439           pending.append(job)
2440
2441           # Archive 10 jobs at a time
2442           if len(pending) >= 10:
2443             archived_count += self._ArchiveJobsUnlocked(pending)
2444             pending = []
2445
2446     if pending:
2447       archived_count += self._ArchiveJobsUnlocked(pending)
2448
2449     return (archived_count, len(all_job_ids) - last_touched)
2450
2451   def QueryJobs(self, job_ids, fields):
2452     """Returns a list of jobs in queue.
2453
2454     @type job_ids: list
2455     @param job_ids: sequence of job identifiers or None for all
2456     @type fields: list
2457     @param fields: names of fields to return
2458     @rtype: list
2459     @return: list one element per job, each element being list with
2460         the requested fields
2461
2462     """
2463     jobs = []
2464     list_all = False
2465     if not job_ids:
2466       # Since files are added to/removed from the queue atomically, there's no
2467       # risk of getting the job ids in an inconsistent state.
2468       job_ids = self._GetJobIDsUnlocked()
2469       list_all = True
2470
2471     for job_id in job_ids:
2472       job = self.SafeLoadJobFromDisk(job_id, True)
2473       if job is not None:
2474         jobs.append(job.GetInfo(fields))
2475       elif not list_all:
2476         jobs.append(None)
2477
2478     return jobs
2479
2480   @locking.ssynchronized(_LOCK)
2481   def PrepareShutdown(self):
2482     """Prepare to stop the job queue.
2483
2484     Disables execution of jobs in the workerpool and returns whether there are
2485     any jobs currently running. If the latter is the case, the job queue is not
2486     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2487     be called without interfering with any job. Queued and unfinished jobs will
2488     be resumed next time.
2489
2490     Once this function has been called no new job submissions will be accepted
2491     (see L{_RequireNonDrainedQueue}).
2492
2493     @rtype: bool
2494     @return: Whether there are any running jobs
2495
2496     """
2497     if self._accepting_jobs:
2498       self._accepting_jobs = False
2499
2500       # Tell worker pool to stop processing pending tasks
2501       self._wpool.SetActive(False)
2502
2503     return self._wpool.HasRunningTasks()
2504
2505   @locking.ssynchronized(_LOCK)
2506   @_RequireOpenQueue
2507   def Shutdown(self):
2508     """Stops the job queue.
2509
2510     This shutdowns all the worker threads an closes the queue.
2511
2512     """
2513     self._wpool.TerminateWorkers()
2514
2515     self._queue_filelock.Close()
2516     self._queue_filelock = None