cli: Error reporting for query filter parsing
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import re
35 import time
36 import weakref
37
38 try:
39   # pylint: disable-msg=E0611
40   from pyinotify import pyinotify
41 except ImportError:
42   import pyinotify
43
44 from ganeti import asyncnotifier
45 from ganeti import constants
46 from ganeti import serializer
47 from ganeti import workerpool
48 from ganeti import locking
49 from ganeti import opcodes
50 from ganeti import errors
51 from ganeti import mcpu
52 from ganeti import utils
53 from ganeti import jstore
54 from ganeti import rpc
55 from ganeti import runtime
56 from ganeti import netutils
57 from ganeti import compat
58
59
60 JOBQUEUE_THREADS = 25
61 JOBS_PER_ARCHIVE_DIRECTORY = 10000
62
63 # member lock names to be passed to @ssynchronized decorator
64 _LOCK = "_lock"
65 _QUEUE = "_queue"
66
67
68 class CancelJob(Exception):
69   """Special exception to cancel a job.
70
71   """
72
73
74 def TimeStampNow():
75   """Returns the current timestamp.
76
77   @rtype: tuple
78   @return: the current time in the (seconds, microseconds) format
79
80   """
81   return utils.SplitTime(time.time())
82
83
84 class _QueuedOpCode(object):
85   """Encapsulates an opcode object.
86
87   @ivar log: holds the execution log and consists of tuples
88   of the form C{(log_serial, timestamp, level, message)}
89   @ivar input: the OpCode we encapsulate
90   @ivar status: the current status
91   @ivar result: the result of the LU execution
92   @ivar start_timestamp: timestamp for the start of the execution
93   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94   @ivar stop_timestamp: timestamp for the end of the execution
95
96   """
97   __slots__ = ["input", "status", "result", "log", "priority",
98                "start_timestamp", "exec_timestamp", "end_timestamp",
99                "__weakref__"]
100
101   def __init__(self, op):
102     """Constructor for the _QuededOpCode.
103
104     @type op: L{opcodes.OpCode}
105     @param op: the opcode we encapsulate
106
107     """
108     self.input = op
109     self.status = constants.OP_STATUS_QUEUED
110     self.result = None
111     self.log = []
112     self.start_timestamp = None
113     self.exec_timestamp = None
114     self.end_timestamp = None
115
116     # Get initial priority (it might change during the lifetime of this opcode)
117     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
118
119   @classmethod
120   def Restore(cls, state):
121     """Restore the _QueuedOpCode from the serialized form.
122
123     @type state: dict
124     @param state: the serialized state
125     @rtype: _QueuedOpCode
126     @return: a new _QueuedOpCode instance
127
128     """
129     obj = _QueuedOpCode.__new__(cls)
130     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
131     obj.status = state["status"]
132     obj.result = state["result"]
133     obj.log = state["log"]
134     obj.start_timestamp = state.get("start_timestamp", None)
135     obj.exec_timestamp = state.get("exec_timestamp", None)
136     obj.end_timestamp = state.get("end_timestamp", None)
137     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
138     return obj
139
140   def Serialize(self):
141     """Serializes this _QueuedOpCode.
142
143     @rtype: dict
144     @return: the dictionary holding the serialized state
145
146     """
147     return {
148       "input": self.input.__getstate__(),
149       "status": self.status,
150       "result": self.result,
151       "log": self.log,
152       "start_timestamp": self.start_timestamp,
153       "exec_timestamp": self.exec_timestamp,
154       "end_timestamp": self.end_timestamp,
155       "priority": self.priority,
156       }
157
158
159 class _QueuedJob(object):
160   """In-memory job representation.
161
162   This is what we use to track the user-submitted jobs. Locking must
163   be taken care of by users of this class.
164
165   @type queue: L{JobQueue}
166   @ivar queue: the parent queue
167   @ivar id: the job ID
168   @type ops: list
169   @ivar ops: the list of _QueuedOpCode that constitute the job
170   @type log_serial: int
171   @ivar log_serial: holds the index for the next log entry
172   @ivar received_timestamp: the timestamp for when the job was received
173   @ivar start_timestmap: the timestamp for start of execution
174   @ivar end_timestamp: the timestamp for end of execution
175
176   """
177   # pylint: disable-msg=W0212
178   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179                "received_timestamp", "start_timestamp", "end_timestamp",
180                "__weakref__"]
181
182   def __init__(self, queue, job_id, ops):
183     """Constructor for the _QueuedJob.
184
185     @type queue: L{JobQueue}
186     @param queue: our parent queue
187     @type job_id: job_id
188     @param job_id: our job id
189     @type ops: list
190     @param ops: the list of opcodes we hold, which will be encapsulated
191         in _QueuedOpCodes
192
193     """
194     if not ops:
195       raise errors.GenericError("A job needs at least one opcode")
196
197     self.queue = queue
198     self.id = job_id
199     self.ops = [_QueuedOpCode(op) for op in ops]
200     self.log_serial = 0
201     self.received_timestamp = TimeStampNow()
202     self.start_timestamp = None
203     self.end_timestamp = None
204
205     self._InitInMemory(self)
206
207   @staticmethod
208   def _InitInMemory(obj):
209     """Initializes in-memory variables.
210
211     """
212     obj.ops_iter = None
213     obj.cur_opctx = None
214
215   def __repr__(self):
216     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
217               "id=%s" % self.id,
218               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
219
220     return "<%s at %#x>" % (" ".join(status), id(self))
221
222   @classmethod
223   def Restore(cls, queue, state):
224     """Restore a _QueuedJob from serialized state:
225
226     @type queue: L{JobQueue}
227     @param queue: to which queue the restored job belongs
228     @type state: dict
229     @param state: the serialized state
230     @rtype: _JobQueue
231     @return: the restored _JobQueue instance
232
233     """
234     obj = _QueuedJob.__new__(cls)
235     obj.queue = queue
236     obj.id = state["id"]
237     obj.received_timestamp = state.get("received_timestamp", None)
238     obj.start_timestamp = state.get("start_timestamp", None)
239     obj.end_timestamp = state.get("end_timestamp", None)
240
241     obj.ops = []
242     obj.log_serial = 0
243     for op_state in state["ops"]:
244       op = _QueuedOpCode.Restore(op_state)
245       for log_entry in op.log:
246         obj.log_serial = max(obj.log_serial, log_entry[0])
247       obj.ops.append(op)
248
249     cls._InitInMemory(obj)
250
251     return obj
252
253   def Serialize(self):
254     """Serialize the _JobQueue instance.
255
256     @rtype: dict
257     @return: the serialized state
258
259     """
260     return {
261       "id": self.id,
262       "ops": [op.Serialize() for op in self.ops],
263       "start_timestamp": self.start_timestamp,
264       "end_timestamp": self.end_timestamp,
265       "received_timestamp": self.received_timestamp,
266       }
267
268   def CalcStatus(self):
269     """Compute the status of this job.
270
271     This function iterates over all the _QueuedOpCodes in the job and
272     based on their status, computes the job status.
273
274     The algorithm is:
275       - if we find a cancelled, or finished with error, the job
276         status will be the same
277       - otherwise, the last opcode with the status one of:
278           - waitlock
279           - canceling
280           - running
281
282         will determine the job status
283
284       - otherwise, it means either all opcodes are queued, or success,
285         and the job status will be the same
286
287     @return: the job status
288
289     """
290     status = constants.JOB_STATUS_QUEUED
291
292     all_success = True
293     for op in self.ops:
294       if op.status == constants.OP_STATUS_SUCCESS:
295         continue
296
297       all_success = False
298
299       if op.status == constants.OP_STATUS_QUEUED:
300         pass
301       elif op.status == constants.OP_STATUS_WAITLOCK:
302         status = constants.JOB_STATUS_WAITLOCK
303       elif op.status == constants.OP_STATUS_RUNNING:
304         status = constants.JOB_STATUS_RUNNING
305       elif op.status == constants.OP_STATUS_CANCELING:
306         status = constants.JOB_STATUS_CANCELING
307         break
308       elif op.status == constants.OP_STATUS_ERROR:
309         status = constants.JOB_STATUS_ERROR
310         # The whole job fails if one opcode failed
311         break
312       elif op.status == constants.OP_STATUS_CANCELED:
313         status = constants.OP_STATUS_CANCELED
314         break
315
316     if all_success:
317       status = constants.JOB_STATUS_SUCCESS
318
319     return status
320
321   def CalcPriority(self):
322     """Gets the current priority for this job.
323
324     Only unfinished opcodes are considered. When all are done, the default
325     priority is used.
326
327     @rtype: int
328
329     """
330     priorities = [op.priority for op in self.ops
331                   if op.status not in constants.OPS_FINALIZED]
332
333     if not priorities:
334       # All opcodes are done, assume default priority
335       return constants.OP_PRIO_DEFAULT
336
337     return min(priorities)
338
339   def GetLogEntries(self, newer_than):
340     """Selectively returns the log entries.
341
342     @type newer_than: None or int
343     @param newer_than: if this is None, return all log entries,
344         otherwise return only the log entries with serial higher
345         than this value
346     @rtype: list
347     @return: the list of the log entries selected
348
349     """
350     if newer_than is None:
351       serial = -1
352     else:
353       serial = newer_than
354
355     entries = []
356     for op in self.ops:
357       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
358
359     return entries
360
361   def GetInfo(self, fields):
362     """Returns information about a job.
363
364     @type fields: list
365     @param fields: names of fields to return
366     @rtype: list
367     @return: list with one element for each field
368     @raise errors.OpExecError: when an invalid field
369         has been passed
370
371     """
372     row = []
373     for fname in fields:
374       if fname == "id":
375         row.append(self.id)
376       elif fname == "status":
377         row.append(self.CalcStatus())
378       elif fname == "priority":
379         row.append(self.CalcPriority())
380       elif fname == "ops":
381         row.append([op.input.__getstate__() for op in self.ops])
382       elif fname == "opresult":
383         row.append([op.result for op in self.ops])
384       elif fname == "opstatus":
385         row.append([op.status for op in self.ops])
386       elif fname == "oplog":
387         row.append([op.log for op in self.ops])
388       elif fname == "opstart":
389         row.append([op.start_timestamp for op in self.ops])
390       elif fname == "opexec":
391         row.append([op.exec_timestamp for op in self.ops])
392       elif fname == "opend":
393         row.append([op.end_timestamp for op in self.ops])
394       elif fname == "oppriority":
395         row.append([op.priority for op in self.ops])
396       elif fname == "received_ts":
397         row.append(self.received_timestamp)
398       elif fname == "start_ts":
399         row.append(self.start_timestamp)
400       elif fname == "end_ts":
401         row.append(self.end_timestamp)
402       elif fname == "summary":
403         row.append([op.input.Summary() for op in self.ops])
404       else:
405         raise errors.OpExecError("Invalid self query field '%s'" % fname)
406     return row
407
408   def MarkUnfinishedOps(self, status, result):
409     """Mark unfinished opcodes with a given status and result.
410
411     This is an utility function for marking all running or waiting to
412     be run opcodes with a given status. Opcodes which are already
413     finalised are not changed.
414
415     @param status: a given opcode status
416     @param result: the opcode result
417
418     """
419     not_marked = True
420     for op in self.ops:
421       if op.status in constants.OPS_FINALIZED:
422         assert not_marked, "Finalized opcodes found after non-finalized ones"
423         continue
424       op.status = status
425       op.result = result
426       not_marked = False
427
428   def Cancel(self):
429     """Marks job as canceled/-ing if possible.
430
431     @rtype: tuple; (bool, string)
432     @return: Boolean describing whether job was successfully canceled or marked
433       as canceling and a text message
434
435     """
436     status = self.CalcStatus()
437
438     if status == constants.JOB_STATUS_QUEUED:
439       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
440                              "Job canceled by request")
441       return (True, "Job %s canceled" % self.id)
442
443     elif status == constants.JOB_STATUS_WAITLOCK:
444       # The worker will notice the new status and cancel the job
445       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
446       return (True, "Job %s will be canceled" % self.id)
447
448     else:
449       logging.debug("Job %s is no longer waiting in the queue", self.id)
450       return (False, "Job %s is no longer waiting in the queue" % self.id)
451
452
453 class _OpExecCallbacks(mcpu.OpExecCbBase):
454   def __init__(self, queue, job, op):
455     """Initializes this class.
456
457     @type queue: L{JobQueue}
458     @param queue: Job queue
459     @type job: L{_QueuedJob}
460     @param job: Job object
461     @type op: L{_QueuedOpCode}
462     @param op: OpCode
463
464     """
465     assert queue, "Queue is missing"
466     assert job, "Job is missing"
467     assert op, "Opcode is missing"
468
469     self._queue = queue
470     self._job = job
471     self._op = op
472
473   def _CheckCancel(self):
474     """Raises an exception to cancel the job if asked to.
475
476     """
477     # Cancel here if we were asked to
478     if self._op.status == constants.OP_STATUS_CANCELING:
479       logging.debug("Canceling opcode")
480       raise CancelJob()
481
482   @locking.ssynchronized(_QUEUE, shared=1)
483   def NotifyStart(self):
484     """Mark the opcode as running, not lock-waiting.
485
486     This is called from the mcpu code as a notifier function, when the LU is
487     finally about to start the Exec() method. Of course, to have end-user
488     visible results, the opcode must be initially (before calling into
489     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
490
491     """
492     assert self._op in self._job.ops
493     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
494                                constants.OP_STATUS_CANCELING)
495
496     # Cancel here if we were asked to
497     self._CheckCancel()
498
499     logging.debug("Opcode is now running")
500
501     self._op.status = constants.OP_STATUS_RUNNING
502     self._op.exec_timestamp = TimeStampNow()
503
504     # And finally replicate the job status
505     self._queue.UpdateJobUnlocked(self._job)
506
507   @locking.ssynchronized(_QUEUE, shared=1)
508   def _AppendFeedback(self, timestamp, log_type, log_msg):
509     """Internal feedback append function, with locks
510
511     """
512     self._job.log_serial += 1
513     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
514     self._queue.UpdateJobUnlocked(self._job, replicate=False)
515
516   def Feedback(self, *args):
517     """Append a log entry.
518
519     """
520     assert len(args) < 3
521
522     if len(args) == 1:
523       log_type = constants.ELOG_MESSAGE
524       log_msg = args[0]
525     else:
526       (log_type, log_msg) = args
527
528     # The time is split to make serialization easier and not lose
529     # precision.
530     timestamp = utils.SplitTime(time.time())
531     self._AppendFeedback(timestamp, log_type, log_msg)
532
533   def CheckCancel(self):
534     """Check whether job has been cancelled.
535
536     """
537     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
538                                constants.OP_STATUS_CANCELING)
539
540     # Cancel here if we were asked to
541     self._CheckCancel()
542
543   def SubmitManyJobs(self, jobs):
544     """Submits jobs for processing.
545
546     See L{JobQueue.SubmitManyJobs}.
547
548     """
549     # Locking is done in job queue
550     return self._queue.SubmitManyJobs(jobs)
551
552
553 class _JobChangesChecker(object):
554   def __init__(self, fields, prev_job_info, prev_log_serial):
555     """Initializes this class.
556
557     @type fields: list of strings
558     @param fields: Fields requested by LUXI client
559     @type prev_job_info: string
560     @param prev_job_info: previous job info, as passed by the LUXI client
561     @type prev_log_serial: string
562     @param prev_log_serial: previous job serial, as passed by the LUXI client
563
564     """
565     self._fields = fields
566     self._prev_job_info = prev_job_info
567     self._prev_log_serial = prev_log_serial
568
569   def __call__(self, job):
570     """Checks whether job has changed.
571
572     @type job: L{_QueuedJob}
573     @param job: Job object
574
575     """
576     status = job.CalcStatus()
577     job_info = job.GetInfo(self._fields)
578     log_entries = job.GetLogEntries(self._prev_log_serial)
579
580     # Serializing and deserializing data can cause type changes (e.g. from
581     # tuple to list) or precision loss. We're doing it here so that we get
582     # the same modifications as the data received from the client. Without
583     # this, the comparison afterwards might fail without the data being
584     # significantly different.
585     # TODO: we just deserialized from disk, investigate how to make sure that
586     # the job info and log entries are compatible to avoid this further step.
587     # TODO: Doing something like in testutils.py:UnifyValueType might be more
588     # efficient, though floats will be tricky
589     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
590     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
591
592     # Don't even try to wait if the job is no longer running, there will be
593     # no changes.
594     if (status not in (constants.JOB_STATUS_QUEUED,
595                        constants.JOB_STATUS_RUNNING,
596                        constants.JOB_STATUS_WAITLOCK) or
597         job_info != self._prev_job_info or
598         (log_entries and self._prev_log_serial != log_entries[0][0])):
599       logging.debug("Job %s changed", job.id)
600       return (job_info, log_entries)
601
602     return None
603
604
605 class _JobFileChangesWaiter(object):
606   def __init__(self, filename):
607     """Initializes this class.
608
609     @type filename: string
610     @param filename: Path to job file
611     @raises errors.InotifyError: if the notifier cannot be setup
612
613     """
614     self._wm = pyinotify.WatchManager()
615     self._inotify_handler = \
616       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
617     self._notifier = \
618       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
619     try:
620       self._inotify_handler.enable()
621     except Exception:
622       # pyinotify doesn't close file descriptors automatically
623       self._notifier.stop()
624       raise
625
626   def _OnInotify(self, notifier_enabled):
627     """Callback for inotify.
628
629     """
630     if not notifier_enabled:
631       self._inotify_handler.enable()
632
633   def Wait(self, timeout):
634     """Waits for the job file to change.
635
636     @type timeout: float
637     @param timeout: Timeout in seconds
638     @return: Whether there have been events
639
640     """
641     assert timeout >= 0
642     have_events = self._notifier.check_events(timeout * 1000)
643     if have_events:
644       self._notifier.read_events()
645     self._notifier.process_events()
646     return have_events
647
648   def Close(self):
649     """Closes underlying notifier and its file descriptor.
650
651     """
652     self._notifier.stop()
653
654
655 class _JobChangesWaiter(object):
656   def __init__(self, filename):
657     """Initializes this class.
658
659     @type filename: string
660     @param filename: Path to job file
661
662     """
663     self._filewaiter = None
664     self._filename = filename
665
666   def Wait(self, timeout):
667     """Waits for a job to change.
668
669     @type timeout: float
670     @param timeout: Timeout in seconds
671     @return: Whether there have been events
672
673     """
674     if self._filewaiter:
675       return self._filewaiter.Wait(timeout)
676
677     # Lazy setup: Avoid inotify setup cost when job file has already changed.
678     # If this point is reached, return immediately and let caller check the job
679     # file again in case there were changes since the last check. This avoids a
680     # race condition.
681     self._filewaiter = _JobFileChangesWaiter(self._filename)
682
683     return True
684
685   def Close(self):
686     """Closes underlying waiter.
687
688     """
689     if self._filewaiter:
690       self._filewaiter.Close()
691
692
693 class _WaitForJobChangesHelper(object):
694   """Helper class using inotify to wait for changes in a job file.
695
696   This class takes a previous job status and serial, and alerts the client when
697   the current job status has changed.
698
699   """
700   @staticmethod
701   def _CheckForChanges(job_load_fn, check_fn):
702     job = job_load_fn()
703     if not job:
704       raise errors.JobLost()
705
706     result = check_fn(job)
707     if result is None:
708       raise utils.RetryAgain()
709
710     return result
711
712   def __call__(self, filename, job_load_fn,
713                fields, prev_job_info, prev_log_serial, timeout):
714     """Waits for changes on a job.
715
716     @type filename: string
717     @param filename: File on which to wait for changes
718     @type job_load_fn: callable
719     @param job_load_fn: Function to load job
720     @type fields: list of strings
721     @param fields: Which fields to check for changes
722     @type prev_job_info: list or None
723     @param prev_job_info: Last job information returned
724     @type prev_log_serial: int
725     @param prev_log_serial: Last job message serial number
726     @type timeout: float
727     @param timeout: maximum time to wait in seconds
728
729     """
730     try:
731       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
732       waiter = _JobChangesWaiter(filename)
733       try:
734         return utils.Retry(compat.partial(self._CheckForChanges,
735                                           job_load_fn, check_fn),
736                            utils.RETRY_REMAINING_TIME, timeout,
737                            wait_fn=waiter.Wait)
738       finally:
739         waiter.Close()
740     except (errors.InotifyError, errors.JobLost):
741       return None
742     except utils.RetryTimeout:
743       return constants.JOB_NOTCHANGED
744
745
746 def _EncodeOpError(err):
747   """Encodes an error which occurred while processing an opcode.
748
749   """
750   if isinstance(err, errors.GenericError):
751     to_encode = err
752   else:
753     to_encode = errors.OpExecError(str(err))
754
755   return errors.EncodeException(to_encode)
756
757
758 class _TimeoutStrategyWrapper:
759   def __init__(self, fn):
760     """Initializes this class.
761
762     """
763     self._fn = fn
764     self._next = None
765
766   def _Advance(self):
767     """Gets the next timeout if necessary.
768
769     """
770     if self._next is None:
771       self._next = self._fn()
772
773   def Peek(self):
774     """Returns the next timeout.
775
776     """
777     self._Advance()
778     return self._next
779
780   def Next(self):
781     """Returns the current timeout and advances the internal state.
782
783     """
784     self._Advance()
785     result = self._next
786     self._next = None
787     return result
788
789
790 class _OpExecContext:
791   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
792     """Initializes this class.
793
794     """
795     self.op = op
796     self.index = index
797     self.log_prefix = log_prefix
798     self.summary = op.input.Summary()
799
800     self._timeout_strategy_factory = timeout_strategy_factory
801     self._ResetTimeoutStrategy()
802
803   def _ResetTimeoutStrategy(self):
804     """Creates a new timeout strategy.
805
806     """
807     self._timeout_strategy = \
808       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
809
810   def CheckPriorityIncrease(self):
811     """Checks whether priority can and should be increased.
812
813     Called when locks couldn't be acquired.
814
815     """
816     op = self.op
817
818     # Exhausted all retries and next round should not use blocking acquire
819     # for locks?
820     if (self._timeout_strategy.Peek() is None and
821         op.priority > constants.OP_PRIO_HIGHEST):
822       logging.debug("Increasing priority")
823       op.priority -= 1
824       self._ResetTimeoutStrategy()
825       return True
826
827     return False
828
829   def GetNextLockTimeout(self):
830     """Returns the next lock acquire timeout.
831
832     """
833     return self._timeout_strategy.Next()
834
835
836 class _JobProcessor(object):
837   def __init__(self, queue, opexec_fn, job,
838                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
839     """Initializes this class.
840
841     """
842     self.queue = queue
843     self.opexec_fn = opexec_fn
844     self.job = job
845     self._timeout_strategy_factory = _timeout_strategy_factory
846
847   @staticmethod
848   def _FindNextOpcode(job, timeout_strategy_factory):
849     """Locates the next opcode to run.
850
851     @type job: L{_QueuedJob}
852     @param job: Job object
853     @param timeout_strategy_factory: Callable to create new timeout strategy
854
855     """
856     # Create some sort of a cache to speed up locating next opcode for future
857     # lookups
858     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
859     # pending and one for processed ops.
860     if job.ops_iter is None:
861       job.ops_iter = enumerate(job.ops)
862
863     # Find next opcode to run
864     while True:
865       try:
866         (idx, op) = job.ops_iter.next()
867       except StopIteration:
868         raise errors.ProgrammerError("Called for a finished job")
869
870       if op.status == constants.OP_STATUS_RUNNING:
871         # Found an opcode already marked as running
872         raise errors.ProgrammerError("Called for job marked as running")
873
874       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
875                              timeout_strategy_factory)
876
877       if op.status == constants.OP_STATUS_CANCELED:
878         # Cancelled jobs are handled by the caller
879         assert not compat.any(i.status != constants.OP_STATUS_CANCELED
880                               for i in job.ops[idx:])
881
882       elif op.status in constants.OPS_FINALIZED:
883         # This is a job that was partially completed before master daemon
884         # shutdown, so it can be expected that some opcodes are already
885         # completed successfully (if any did error out, then the whole job
886         # should have been aborted and not resubmitted for processing).
887         logging.info("%s: opcode %s already processed, skipping",
888                      opctx.log_prefix, opctx.summary)
889         continue
890
891       return opctx
892
893   @staticmethod
894   def _MarkWaitlock(job, op):
895     """Marks an opcode as waiting for locks.
896
897     The job's start timestamp is also set if necessary.
898
899     @type job: L{_QueuedJob}
900     @param job: Job object
901     @type op: L{_QueuedOpCode}
902     @param op: Opcode object
903
904     """
905     assert op in job.ops
906     assert op.status in (constants.OP_STATUS_QUEUED,
907                          constants.OP_STATUS_WAITLOCK)
908
909     update = False
910
911     op.result = None
912
913     if op.status == constants.OP_STATUS_QUEUED:
914       op.status = constants.OP_STATUS_WAITLOCK
915       update = True
916
917     if op.start_timestamp is None:
918       op.start_timestamp = TimeStampNow()
919       update = True
920
921     if job.start_timestamp is None:
922       job.start_timestamp = op.start_timestamp
923       update = True
924
925     assert op.status == constants.OP_STATUS_WAITLOCK
926
927     return update
928
929   def _ExecOpCodeUnlocked(self, opctx):
930     """Processes one opcode and returns the result.
931
932     """
933     op = opctx.op
934
935     assert op.status == constants.OP_STATUS_WAITLOCK
936
937     timeout = opctx.GetNextLockTimeout()
938
939     try:
940       # Make sure not to hold queue lock while calling ExecOpCode
941       result = self.opexec_fn(op.input,
942                               _OpExecCallbacks(self.queue, self.job, op),
943                               timeout=timeout, priority=op.priority)
944     except mcpu.LockAcquireTimeout:
945       assert timeout is not None, "Received timeout for blocking acquire"
946       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
947
948       assert op.status in (constants.OP_STATUS_WAITLOCK,
949                            constants.OP_STATUS_CANCELING)
950
951       # Was job cancelled while we were waiting for the lock?
952       if op.status == constants.OP_STATUS_CANCELING:
953         return (constants.OP_STATUS_CANCELING, None)
954
955       # Stay in waitlock while trying to re-acquire lock
956       return (constants.OP_STATUS_WAITLOCK, None)
957     except CancelJob:
958       logging.exception("%s: Canceling job", opctx.log_prefix)
959       assert op.status == constants.OP_STATUS_CANCELING
960       return (constants.OP_STATUS_CANCELING, None)
961     except Exception, err: # pylint: disable-msg=W0703
962       logging.exception("%s: Caught exception in %s",
963                         opctx.log_prefix, opctx.summary)
964       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
965     else:
966       logging.debug("%s: %s successful",
967                     opctx.log_prefix, opctx.summary)
968       return (constants.OP_STATUS_SUCCESS, result)
969
970   def __call__(self, _nextop_fn=None):
971     """Continues execution of a job.
972
973     @param _nextop_fn: Callback function for tests
974     @rtype: bool
975     @return: True if job is finished, False if processor needs to be called
976              again
977
978     """
979     queue = self.queue
980     job = self.job
981
982     logging.debug("Processing job %s", job.id)
983
984     queue.acquire(shared=1)
985     try:
986       opcount = len(job.ops)
987
988       # Is a previous opcode still pending?
989       if job.cur_opctx:
990         opctx = job.cur_opctx
991         job.cur_opctx = None
992       else:
993         if __debug__ and _nextop_fn:
994           _nextop_fn()
995         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
996
997       op = opctx.op
998
999       # Consistency check
1000       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1001                                      constants.OP_STATUS_CANCELING,
1002                                      constants.OP_STATUS_CANCELED)
1003                         for i in job.ops[opctx.index + 1:])
1004
1005       assert op.status in (constants.OP_STATUS_QUEUED,
1006                            constants.OP_STATUS_WAITLOCK,
1007                            constants.OP_STATUS_CANCELING,
1008                            constants.OP_STATUS_CANCELED)
1009
1010       assert (op.priority <= constants.OP_PRIO_LOWEST and
1011               op.priority >= constants.OP_PRIO_HIGHEST)
1012
1013       if op.status not in (constants.OP_STATUS_CANCELING,
1014                            constants.OP_STATUS_CANCELED):
1015         assert op.status in (constants.OP_STATUS_QUEUED,
1016                              constants.OP_STATUS_WAITLOCK)
1017
1018         # Prepare to start opcode
1019         if self._MarkWaitlock(job, op):
1020           # Write to disk
1021           queue.UpdateJobUnlocked(job)
1022
1023         assert op.status == constants.OP_STATUS_WAITLOCK
1024         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1025         assert job.start_timestamp and op.start_timestamp
1026
1027         logging.info("%s: opcode %s waiting for locks",
1028                      opctx.log_prefix, opctx.summary)
1029
1030         queue.release()
1031         try:
1032           (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1033         finally:
1034           queue.acquire(shared=1)
1035
1036         op.status = op_status
1037         op.result = op_result
1038
1039         if op.status == constants.OP_STATUS_WAITLOCK:
1040           # Couldn't get locks in time
1041           assert not op.end_timestamp
1042         else:
1043           # Finalize opcode
1044           op.end_timestamp = TimeStampNow()
1045
1046           if op.status == constants.OP_STATUS_CANCELING:
1047             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1048                                   for i in job.ops[opctx.index:])
1049           else:
1050             assert op.status in constants.OPS_FINALIZED
1051
1052       if op.status == constants.OP_STATUS_WAITLOCK:
1053         finalize = False
1054
1055         if opctx.CheckPriorityIncrease():
1056           # Priority was changed, need to update on-disk file
1057           queue.UpdateJobUnlocked(job)
1058
1059         # Keep around for another round
1060         job.cur_opctx = opctx
1061
1062         assert (op.priority <= constants.OP_PRIO_LOWEST and
1063                 op.priority >= constants.OP_PRIO_HIGHEST)
1064
1065         # In no case must the status be finalized here
1066         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1067
1068       else:
1069         # Ensure all opcodes so far have been successful
1070         assert (opctx.index == 0 or
1071                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1072                            for i in job.ops[:opctx.index]))
1073
1074         # Reset context
1075         job.cur_opctx = None
1076
1077         if op.status == constants.OP_STATUS_SUCCESS:
1078           finalize = False
1079
1080         elif op.status == constants.OP_STATUS_ERROR:
1081           # Ensure failed opcode has an exception as its result
1082           assert errors.GetEncodedError(job.ops[opctx.index].result)
1083
1084           to_encode = errors.OpExecError("Preceding opcode failed")
1085           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1086                                 _EncodeOpError(to_encode))
1087           finalize = True
1088
1089           # Consistency check
1090           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1091                             errors.GetEncodedError(i.result)
1092                             for i in job.ops[opctx.index:])
1093
1094         elif op.status == constants.OP_STATUS_CANCELING:
1095           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1096                                 "Job canceled by request")
1097           finalize = True
1098
1099         elif op.status == constants.OP_STATUS_CANCELED:
1100           finalize = True
1101
1102         else:
1103           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1104
1105         # Finalizing or last opcode?
1106         if finalize or opctx.index == (opcount - 1):
1107           # All opcodes have been run, finalize job
1108           job.end_timestamp = TimeStampNow()
1109
1110         # Write to disk. If the job status is final, this is the final write
1111         # allowed. Once the file has been written, it can be archived anytime.
1112         queue.UpdateJobUnlocked(job)
1113
1114         if finalize or opctx.index == (opcount - 1):
1115           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1116           return True
1117
1118       return False
1119     finally:
1120       queue.release()
1121
1122
1123 class _JobQueueWorker(workerpool.BaseWorker):
1124   """The actual job workers.
1125
1126   """
1127   def RunTask(self, job): # pylint: disable-msg=W0221
1128     """Job executor.
1129
1130     This functions processes a job. It is closely tied to the L{_QueuedJob} and
1131     L{_QueuedOpCode} classes.
1132
1133     @type job: L{_QueuedJob}
1134     @param job: the job to be processed
1135
1136     """
1137     queue = job.queue
1138     assert queue == self.pool.queue
1139
1140     self.SetTaskName("Job%s" % job.id)
1141
1142     proc = mcpu.Processor(queue.context, job.id)
1143
1144     if not _JobProcessor(queue, proc.ExecOpCode, job)():
1145       # Schedule again
1146       raise workerpool.DeferTask(priority=job.CalcPriority())
1147
1148
1149 class _JobQueueWorkerPool(workerpool.WorkerPool):
1150   """Simple class implementing a job-processing workerpool.
1151
1152   """
1153   def __init__(self, queue):
1154     super(_JobQueueWorkerPool, self).__init__("JobQueue",
1155                                               JOBQUEUE_THREADS,
1156                                               _JobQueueWorker)
1157     self.queue = queue
1158
1159
1160 def _RequireOpenQueue(fn):
1161   """Decorator for "public" functions.
1162
1163   This function should be used for all 'public' functions. That is,
1164   functions usually called from other classes. Note that this should
1165   be applied only to methods (not plain functions), since it expects
1166   that the decorated function is called with a first argument that has
1167   a '_queue_filelock' argument.
1168
1169   @warning: Use this decorator only after locking.ssynchronized
1170
1171   Example::
1172     @locking.ssynchronized(_LOCK)
1173     @_RequireOpenQueue
1174     def Example(self):
1175       pass
1176
1177   """
1178   def wrapper(self, *args, **kwargs):
1179     # pylint: disable-msg=W0212
1180     assert self._queue_filelock is not None, "Queue should be open"
1181     return fn(self, *args, **kwargs)
1182   return wrapper
1183
1184
1185 class JobQueue(object):
1186   """Queue used to manage the jobs.
1187
1188   @cvar _RE_JOB_FILE: regex matching the valid job file names
1189
1190   """
1191   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1192
1193   def __init__(self, context):
1194     """Constructor for JobQueue.
1195
1196     The constructor will initialize the job queue object and then
1197     start loading the current jobs from disk, either for starting them
1198     (if they were queue) or for aborting them (if they were already
1199     running).
1200
1201     @type context: GanetiContext
1202     @param context: the context object for access to the configuration
1203         data and other ganeti objects
1204
1205     """
1206     self.context = context
1207     self._memcache = weakref.WeakValueDictionary()
1208     self._my_hostname = netutils.Hostname.GetSysName()
1209
1210     # The Big JobQueue lock. If a code block or method acquires it in shared
1211     # mode safe it must guarantee concurrency with all the code acquiring it in
1212     # shared mode, including itself. In order not to acquire it at all
1213     # concurrency must be guaranteed with all code acquiring it in shared mode
1214     # and all code acquiring it exclusively.
1215     self._lock = locking.SharedLock("JobQueue")
1216
1217     self.acquire = self._lock.acquire
1218     self.release = self._lock.release
1219
1220     # Initialize the queue, and acquire the filelock.
1221     # This ensures no other process is working on the job queue.
1222     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1223
1224     # Read serial file
1225     self._last_serial = jstore.ReadSerial()
1226     assert self._last_serial is not None, ("Serial file was modified between"
1227                                            " check in jstore and here")
1228
1229     # Get initial list of nodes
1230     self._nodes = dict((n.name, n.primary_ip)
1231                        for n in self.context.cfg.GetAllNodesInfo().values()
1232                        if n.master_candidate)
1233
1234     # Remove master node
1235     self._nodes.pop(self._my_hostname, None)
1236
1237     # TODO: Check consistency across nodes
1238
1239     self._queue_size = 0
1240     self._UpdateQueueSizeUnlocked()
1241     self._drained = jstore.CheckDrainFlag()
1242
1243     # Setup worker pool
1244     self._wpool = _JobQueueWorkerPool(self)
1245     try:
1246       self._InspectQueue()
1247     except:
1248       self._wpool.TerminateWorkers()
1249       raise
1250
1251   @locking.ssynchronized(_LOCK)
1252   @_RequireOpenQueue
1253   def _InspectQueue(self):
1254     """Loads the whole job queue and resumes unfinished jobs.
1255
1256     This function needs the lock here because WorkerPool.AddTask() may start a
1257     job while we're still doing our work.
1258
1259     """
1260     logging.info("Inspecting job queue")
1261
1262     restartjobs = []
1263
1264     all_job_ids = self._GetJobIDsUnlocked()
1265     jobs_count = len(all_job_ids)
1266     lastinfo = time.time()
1267     for idx, job_id in enumerate(all_job_ids):
1268       # Give an update every 1000 jobs or 10 seconds
1269       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1270           idx == (jobs_count - 1)):
1271         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1272                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1273         lastinfo = time.time()
1274
1275       job = self._LoadJobUnlocked(job_id)
1276
1277       # a failure in loading the job can cause 'None' to be returned
1278       if job is None:
1279         continue
1280
1281       status = job.CalcStatus()
1282
1283       if status == constants.JOB_STATUS_QUEUED:
1284         restartjobs.append(job)
1285
1286       elif status in (constants.JOB_STATUS_RUNNING,
1287                       constants.JOB_STATUS_WAITLOCK,
1288                       constants.JOB_STATUS_CANCELING):
1289         logging.warning("Unfinished job %s found: %s", job.id, job)
1290
1291         if status == constants.JOB_STATUS_WAITLOCK:
1292           # Restart job
1293           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1294           restartjobs.append(job)
1295         else:
1296           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1297                                 "Unclean master daemon shutdown")
1298
1299         self.UpdateJobUnlocked(job)
1300
1301     if restartjobs:
1302       logging.info("Restarting %s jobs", len(restartjobs))
1303       self._EnqueueJobs(restartjobs)
1304
1305     logging.info("Job queue inspection finished")
1306
1307   @locking.ssynchronized(_LOCK)
1308   @_RequireOpenQueue
1309   def AddNode(self, node):
1310     """Register a new node with the queue.
1311
1312     @type node: L{objects.Node}
1313     @param node: the node object to be added
1314
1315     """
1316     node_name = node.name
1317     assert node_name != self._my_hostname
1318
1319     # Clean queue directory on added node
1320     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1321     msg = result.fail_msg
1322     if msg:
1323       logging.warning("Cannot cleanup queue directory on node %s: %s",
1324                       node_name, msg)
1325
1326     if not node.master_candidate:
1327       # remove if existing, ignoring errors
1328       self._nodes.pop(node_name, None)
1329       # and skip the replication of the job ids
1330       return
1331
1332     # Upload the whole queue excluding archived jobs
1333     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1334
1335     # Upload current serial file
1336     files.append(constants.JOB_QUEUE_SERIAL_FILE)
1337
1338     for file_name in files:
1339       # Read file content
1340       content = utils.ReadFile(file_name)
1341
1342       result = rpc.RpcRunner.call_jobqueue_update([node_name],
1343                                                   [node.primary_ip],
1344                                                   file_name, content)
1345       msg = result[node_name].fail_msg
1346       if msg:
1347         logging.error("Failed to upload file %s to node %s: %s",
1348                       file_name, node_name, msg)
1349
1350     self._nodes[node_name] = node.primary_ip
1351
1352   @locking.ssynchronized(_LOCK)
1353   @_RequireOpenQueue
1354   def RemoveNode(self, node_name):
1355     """Callback called when removing nodes from the cluster.
1356
1357     @type node_name: str
1358     @param node_name: the name of the node to remove
1359
1360     """
1361     self._nodes.pop(node_name, None)
1362
1363   @staticmethod
1364   def _CheckRpcResult(result, nodes, failmsg):
1365     """Verifies the status of an RPC call.
1366
1367     Since we aim to keep consistency should this node (the current
1368     master) fail, we will log errors if our rpc fail, and especially
1369     log the case when more than half of the nodes fails.
1370
1371     @param result: the data as returned from the rpc call
1372     @type nodes: list
1373     @param nodes: the list of nodes we made the call to
1374     @type failmsg: str
1375     @param failmsg: the identifier to be used for logging
1376
1377     """
1378     failed = []
1379     success = []
1380
1381     for node in nodes:
1382       msg = result[node].fail_msg
1383       if msg:
1384         failed.append(node)
1385         logging.error("RPC call %s (%s) failed on node %s: %s",
1386                       result[node].call, failmsg, node, msg)
1387       else:
1388         success.append(node)
1389
1390     # +1 for the master node
1391     if (len(success) + 1) < len(failed):
1392       # TODO: Handle failing nodes
1393       logging.error("More than half of the nodes failed")
1394
1395   def _GetNodeIp(self):
1396     """Helper for returning the node name/ip list.
1397
1398     @rtype: (list, list)
1399     @return: a tuple of two lists, the first one with the node
1400         names and the second one with the node addresses
1401
1402     """
1403     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1404     name_list = self._nodes.keys()
1405     addr_list = [self._nodes[name] for name in name_list]
1406     return name_list, addr_list
1407
1408   def _UpdateJobQueueFile(self, file_name, data, replicate):
1409     """Writes a file locally and then replicates it to all nodes.
1410
1411     This function will replace the contents of a file on the local
1412     node and then replicate it to all the other nodes we have.
1413
1414     @type file_name: str
1415     @param file_name: the path of the file to be replicated
1416     @type data: str
1417     @param data: the new contents of the file
1418     @type replicate: boolean
1419     @param replicate: whether to spread the changes to the remote nodes
1420
1421     """
1422     getents = runtime.GetEnts()
1423     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1424                     gid=getents.masterd_gid)
1425
1426     if replicate:
1427       names, addrs = self._GetNodeIp()
1428       result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1429       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1430
1431   def _RenameFilesUnlocked(self, rename):
1432     """Renames a file locally and then replicate the change.
1433
1434     This function will rename a file in the local queue directory
1435     and then replicate this rename to all the other nodes we have.
1436
1437     @type rename: list of (old, new)
1438     @param rename: List containing tuples mapping old to new names
1439
1440     """
1441     # Rename them locally
1442     for old, new in rename:
1443       utils.RenameFile(old, new, mkdir=True)
1444
1445     # ... and on all nodes
1446     names, addrs = self._GetNodeIp()
1447     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1448     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1449
1450   @staticmethod
1451   def _FormatJobID(job_id):
1452     """Convert a job ID to string format.
1453
1454     Currently this just does C{str(job_id)} after performing some
1455     checks, but if we want to change the job id format this will
1456     abstract this change.
1457
1458     @type job_id: int or long
1459     @param job_id: the numeric job id
1460     @rtype: str
1461     @return: the formatted job id
1462
1463     """
1464     if not isinstance(job_id, (int, long)):
1465       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1466     if job_id < 0:
1467       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1468
1469     return str(job_id)
1470
1471   @classmethod
1472   def _GetArchiveDirectory(cls, job_id):
1473     """Returns the archive directory for a job.
1474
1475     @type job_id: str
1476     @param job_id: Job identifier
1477     @rtype: str
1478     @return: Directory name
1479
1480     """
1481     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1482
1483   def _NewSerialsUnlocked(self, count):
1484     """Generates a new job identifier.
1485
1486     Job identifiers are unique during the lifetime of a cluster.
1487
1488     @type count: integer
1489     @param count: how many serials to return
1490     @rtype: str
1491     @return: a string representing the job identifier.
1492
1493     """
1494     assert count > 0
1495     # New number
1496     serial = self._last_serial + count
1497
1498     # Write to file
1499     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1500                              "%s\n" % serial, True)
1501
1502     result = [self._FormatJobID(v)
1503               for v in range(self._last_serial, serial + 1)]
1504     # Keep it only if we were able to write the file
1505     self._last_serial = serial
1506
1507     return result
1508
1509   @staticmethod
1510   def _GetJobPath(job_id):
1511     """Returns the job file for a given job id.
1512
1513     @type job_id: str
1514     @param job_id: the job identifier
1515     @rtype: str
1516     @return: the path to the job file
1517
1518     """
1519     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1520
1521   @classmethod
1522   def _GetArchivedJobPath(cls, job_id):
1523     """Returns the archived job file for a give job id.
1524
1525     @type job_id: str
1526     @param job_id: the job identifier
1527     @rtype: str
1528     @return: the path to the archived job file
1529
1530     """
1531     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1532                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1533
1534   def _GetJobIDsUnlocked(self, sort=True):
1535     """Return all known job IDs.
1536
1537     The method only looks at disk because it's a requirement that all
1538     jobs are present on disk (so in the _memcache we don't have any
1539     extra IDs).
1540
1541     @type sort: boolean
1542     @param sort: perform sorting on the returned job ids
1543     @rtype: list
1544     @return: the list of job IDs
1545
1546     """
1547     jlist = []
1548     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1549       m = self._RE_JOB_FILE.match(filename)
1550       if m:
1551         jlist.append(m.group(1))
1552     if sort:
1553       jlist = utils.NiceSort(jlist)
1554     return jlist
1555
1556   def _LoadJobUnlocked(self, job_id):
1557     """Loads a job from the disk or memory.
1558
1559     Given a job id, this will return the cached job object if
1560     existing, or try to load the job from the disk. If loading from
1561     disk, it will also add the job to the cache.
1562
1563     @param job_id: the job id
1564     @rtype: L{_QueuedJob} or None
1565     @return: either None or the job object
1566
1567     """
1568     job = self._memcache.get(job_id, None)
1569     if job:
1570       logging.debug("Found job %s in memcache", job_id)
1571       return job
1572
1573     try:
1574       job = self._LoadJobFromDisk(job_id)
1575       if job is None:
1576         return job
1577     except errors.JobFileCorrupted:
1578       old_path = self._GetJobPath(job_id)
1579       new_path = self._GetArchivedJobPath(job_id)
1580       if old_path == new_path:
1581         # job already archived (future case)
1582         logging.exception("Can't parse job %s", job_id)
1583       else:
1584         # non-archived case
1585         logging.exception("Can't parse job %s, will archive.", job_id)
1586         self._RenameFilesUnlocked([(old_path, new_path)])
1587       return None
1588
1589     self._memcache[job_id] = job
1590     logging.debug("Added job %s to the cache", job_id)
1591     return job
1592
1593   def _LoadJobFromDisk(self, job_id):
1594     """Load the given job file from disk.
1595
1596     Given a job file, read, load and restore it in a _QueuedJob format.
1597
1598     @type job_id: string
1599     @param job_id: job identifier
1600     @rtype: L{_QueuedJob} or None
1601     @return: either None or the job object
1602
1603     """
1604     filepath = self._GetJobPath(job_id)
1605     logging.debug("Loading job from %s", filepath)
1606     try:
1607       raw_data = utils.ReadFile(filepath)
1608     except EnvironmentError, err:
1609       if err.errno in (errno.ENOENT, ):
1610         return None
1611       raise
1612
1613     try:
1614       data = serializer.LoadJson(raw_data)
1615       job = _QueuedJob.Restore(self, data)
1616     except Exception, err: # pylint: disable-msg=W0703
1617       raise errors.JobFileCorrupted(err)
1618
1619     return job
1620
1621   def SafeLoadJobFromDisk(self, job_id):
1622     """Load the given job file from disk.
1623
1624     Given a job file, read, load and restore it in a _QueuedJob format.
1625     In case of error reading the job, it gets returned as None, and the
1626     exception is logged.
1627
1628     @type job_id: string
1629     @param job_id: job identifier
1630     @rtype: L{_QueuedJob} or None
1631     @return: either None or the job object
1632
1633     """
1634     try:
1635       return self._LoadJobFromDisk(job_id)
1636     except (errors.JobFileCorrupted, EnvironmentError):
1637       logging.exception("Can't load/parse job %s", job_id)
1638       return None
1639
1640   def _UpdateQueueSizeUnlocked(self):
1641     """Update the queue size.
1642
1643     """
1644     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1645
1646   @locking.ssynchronized(_LOCK)
1647   @_RequireOpenQueue
1648   def SetDrainFlag(self, drain_flag):
1649     """Sets the drain flag for the queue.
1650
1651     @type drain_flag: boolean
1652     @param drain_flag: Whether to set or unset the drain flag
1653
1654     """
1655     jstore.SetDrainFlag(drain_flag)
1656
1657     self._drained = drain_flag
1658
1659     return True
1660
1661   @_RequireOpenQueue
1662   def _SubmitJobUnlocked(self, job_id, ops):
1663     """Create and store a new job.
1664
1665     This enters the job into our job queue and also puts it on the new
1666     queue, in order for it to be picked up by the queue processors.
1667
1668     @type job_id: job ID
1669     @param job_id: the job ID for the new job
1670     @type ops: list
1671     @param ops: The list of OpCodes that will become the new job.
1672     @rtype: L{_QueuedJob}
1673     @return: the job object to be queued
1674     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1675     @raise errors.JobQueueFull: if the job queue has too many jobs in it
1676     @raise errors.GenericError: If an opcode is not valid
1677
1678     """
1679     # Ok when sharing the big job queue lock, as the drain file is created when
1680     # the lock is exclusive.
1681     if self._drained:
1682       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1683
1684     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1685       raise errors.JobQueueFull()
1686
1687     job = _QueuedJob(self, job_id, ops)
1688
1689     # Check priority
1690     for idx, op in enumerate(job.ops):
1691       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1692         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1693         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1694                                   " are %s" % (idx, op.priority, allowed))
1695
1696     # Write to disk
1697     self.UpdateJobUnlocked(job)
1698
1699     self._queue_size += 1
1700
1701     logging.debug("Adding new job %s to the cache", job_id)
1702     self._memcache[job_id] = job
1703
1704     return job
1705
1706   @locking.ssynchronized(_LOCK)
1707   @_RequireOpenQueue
1708   def SubmitJob(self, ops):
1709     """Create and store a new job.
1710
1711     @see: L{_SubmitJobUnlocked}
1712
1713     """
1714     job_id = self._NewSerialsUnlocked(1)[0]
1715     self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1716     return job_id
1717
1718   @locking.ssynchronized(_LOCK)
1719   @_RequireOpenQueue
1720   def SubmitManyJobs(self, jobs):
1721     """Create and store multiple jobs.
1722
1723     @see: L{_SubmitJobUnlocked}
1724
1725     """
1726     results = []
1727     added_jobs = []
1728     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1729     for job_id, ops in zip(all_job_ids, jobs):
1730       try:
1731         added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1732         status = True
1733         data = job_id
1734       except errors.GenericError, err:
1735         data = ("%s; opcodes %s" %
1736                 (err, utils.CommaJoin(op.Summary() for op in ops)))
1737         status = False
1738       results.append((status, data))
1739
1740     self._EnqueueJobs(added_jobs)
1741
1742     return results
1743
1744   def _EnqueueJobs(self, jobs):
1745     """Helper function to add jobs to worker pool's queue.
1746
1747     @type jobs: list
1748     @param jobs: List of all jobs
1749
1750     """
1751     self._wpool.AddManyTasks([(job, ) for job in jobs],
1752                              priority=[job.CalcPriority() for job in jobs])
1753
1754   @_RequireOpenQueue
1755   def UpdateJobUnlocked(self, job, replicate=True):
1756     """Update a job's on disk storage.
1757
1758     After a job has been modified, this function needs to be called in
1759     order to write the changes to disk and replicate them to the other
1760     nodes.
1761
1762     @type job: L{_QueuedJob}
1763     @param job: the changed job
1764     @type replicate: boolean
1765     @param replicate: whether to replicate the change to remote nodes
1766
1767     """
1768     filename = self._GetJobPath(job.id)
1769     data = serializer.DumpJson(job.Serialize(), indent=False)
1770     logging.debug("Writing job %s to %s", job.id, filename)
1771     self._UpdateJobQueueFile(filename, data, replicate)
1772
1773   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1774                         timeout):
1775     """Waits for changes in a job.
1776
1777     @type job_id: string
1778     @param job_id: Job identifier
1779     @type fields: list of strings
1780     @param fields: Which fields to check for changes
1781     @type prev_job_info: list or None
1782     @param prev_job_info: Last job information returned
1783     @type prev_log_serial: int
1784     @param prev_log_serial: Last job message serial number
1785     @type timeout: float
1786     @param timeout: maximum time to wait in seconds
1787     @rtype: tuple (job info, log entries)
1788     @return: a tuple of the job information as required via
1789         the fields parameter, and the log entries as a list
1790
1791         if the job has not changed and the timeout has expired,
1792         we instead return a special value,
1793         L{constants.JOB_NOTCHANGED}, which should be interpreted
1794         as such by the clients
1795
1796     """
1797     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1798
1799     helper = _WaitForJobChangesHelper()
1800
1801     return helper(self._GetJobPath(job_id), load_fn,
1802                   fields, prev_job_info, prev_log_serial, timeout)
1803
1804   @locking.ssynchronized(_LOCK)
1805   @_RequireOpenQueue
1806   def CancelJob(self, job_id):
1807     """Cancels a job.
1808
1809     This will only succeed if the job has not started yet.
1810
1811     @type job_id: string
1812     @param job_id: job ID of job to be cancelled.
1813
1814     """
1815     logging.info("Cancelling job %s", job_id)
1816
1817     job = self._LoadJobUnlocked(job_id)
1818     if not job:
1819       logging.debug("Job %s not found", job_id)
1820       return (False, "Job %s not found" % job_id)
1821
1822     (success, msg) = job.Cancel()
1823
1824     if success:
1825       self.UpdateJobUnlocked(job)
1826
1827     return (success, msg)
1828
1829   @_RequireOpenQueue
1830   def _ArchiveJobsUnlocked(self, jobs):
1831     """Archives jobs.
1832
1833     @type jobs: list of L{_QueuedJob}
1834     @param jobs: Job objects
1835     @rtype: int
1836     @return: Number of archived jobs
1837
1838     """
1839     archive_jobs = []
1840     rename_files = []
1841     for job in jobs:
1842       if job.CalcStatus() not in constants.JOBS_FINALIZED:
1843         logging.debug("Job %s is not yet done", job.id)
1844         continue
1845
1846       archive_jobs.append(job)
1847
1848       old = self._GetJobPath(job.id)
1849       new = self._GetArchivedJobPath(job.id)
1850       rename_files.append((old, new))
1851
1852     # TODO: What if 1..n files fail to rename?
1853     self._RenameFilesUnlocked(rename_files)
1854
1855     logging.debug("Successfully archived job(s) %s",
1856                   utils.CommaJoin(job.id for job in archive_jobs))
1857
1858     # Since we haven't quite checked, above, if we succeeded or failed renaming
1859     # the files, we update the cached queue size from the filesystem. When we
1860     # get around to fix the TODO: above, we can use the number of actually
1861     # archived jobs to fix this.
1862     self._UpdateQueueSizeUnlocked()
1863     return len(archive_jobs)
1864
1865   @locking.ssynchronized(_LOCK)
1866   @_RequireOpenQueue
1867   def ArchiveJob(self, job_id):
1868     """Archives a job.
1869
1870     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1871
1872     @type job_id: string
1873     @param job_id: Job ID of job to be archived.
1874     @rtype: bool
1875     @return: Whether job was archived
1876
1877     """
1878     logging.info("Archiving job %s", job_id)
1879
1880     job = self._LoadJobUnlocked(job_id)
1881     if not job:
1882       logging.debug("Job %s not found", job_id)
1883       return False
1884
1885     return self._ArchiveJobsUnlocked([job]) == 1
1886
1887   @locking.ssynchronized(_LOCK)
1888   @_RequireOpenQueue
1889   def AutoArchiveJobs(self, age, timeout):
1890     """Archives all jobs based on age.
1891
1892     The method will archive all jobs which are older than the age
1893     parameter. For jobs that don't have an end timestamp, the start
1894     timestamp will be considered. The special '-1' age will cause
1895     archival of all jobs (that are not running or queued).
1896
1897     @type age: int
1898     @param age: the minimum age in seconds
1899
1900     """
1901     logging.info("Archiving jobs with age more than %s seconds", age)
1902
1903     now = time.time()
1904     end_time = now + timeout
1905     archived_count = 0
1906     last_touched = 0
1907
1908     all_job_ids = self._GetJobIDsUnlocked()
1909     pending = []
1910     for idx, job_id in enumerate(all_job_ids):
1911       last_touched = idx + 1
1912
1913       # Not optimal because jobs could be pending
1914       # TODO: Measure average duration for job archival and take number of
1915       # pending jobs into account.
1916       if time.time() > end_time:
1917         break
1918
1919       # Returns None if the job failed to load
1920       job = self._LoadJobUnlocked(job_id)
1921       if job:
1922         if job.end_timestamp is None:
1923           if job.start_timestamp is None:
1924             job_age = job.received_timestamp
1925           else:
1926             job_age = job.start_timestamp
1927         else:
1928           job_age = job.end_timestamp
1929
1930         if age == -1 or now - job_age[0] > age:
1931           pending.append(job)
1932
1933           # Archive 10 jobs at a time
1934           if len(pending) >= 10:
1935             archived_count += self._ArchiveJobsUnlocked(pending)
1936             pending = []
1937
1938     if pending:
1939       archived_count += self._ArchiveJobsUnlocked(pending)
1940
1941     return (archived_count, len(all_job_ids) - last_touched)
1942
1943   def QueryJobs(self, job_ids, fields):
1944     """Returns a list of jobs in queue.
1945
1946     @type job_ids: list
1947     @param job_ids: sequence of job identifiers or None for all
1948     @type fields: list
1949     @param fields: names of fields to return
1950     @rtype: list
1951     @return: list one element per job, each element being list with
1952         the requested fields
1953
1954     """
1955     jobs = []
1956     list_all = False
1957     if not job_ids:
1958       # Since files are added to/removed from the queue atomically, there's no
1959       # risk of getting the job ids in an inconsistent state.
1960       job_ids = self._GetJobIDsUnlocked()
1961       list_all = True
1962
1963     for job_id in job_ids:
1964       job = self.SafeLoadJobFromDisk(job_id)
1965       if job is not None:
1966         jobs.append(job.GetInfo(fields))
1967       elif not list_all:
1968         jobs.append(None)
1969
1970     return jobs
1971
1972   @locking.ssynchronized(_LOCK)
1973   @_RequireOpenQueue
1974   def Shutdown(self):
1975     """Stops the job queue.
1976
1977     This shutdowns all the worker threads an closes the queue.
1978
1979     """
1980     self._wpool.TerminateWorkers()
1981
1982     self._queue_filelock.Close()
1983     self._queue_filelock = None