Fix hook node list when adding node
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import re
35 import time
36 import weakref
37
38 try:
39   # pylint: disable-msg=E0611
40   from pyinotify import pyinotify
41 except ImportError:
42   import pyinotify
43
44 from ganeti import asyncnotifier
45 from ganeti import constants
46 from ganeti import serializer
47 from ganeti import workerpool
48 from ganeti import locking
49 from ganeti import opcodes
50 from ganeti import errors
51 from ganeti import mcpu
52 from ganeti import utils
53 from ganeti import jstore
54 from ganeti import rpc
55 from ganeti import runtime
56 from ganeti import netutils
57 from ganeti import compat
58
59
60 JOBQUEUE_THREADS = 25
61 JOBS_PER_ARCHIVE_DIRECTORY = 10000
62
63 # member lock names to be passed to @ssynchronized decorator
64 _LOCK = "_lock"
65 _QUEUE = "_queue"
66
67
68 class CancelJob(Exception):
69   """Special exception to cancel a job.
70
71   """
72
73
74 def TimeStampNow():
75   """Returns the current timestamp.
76
77   @rtype: tuple
78   @return: the current time in the (seconds, microseconds) format
79
80   """
81   return utils.SplitTime(time.time())
82
83
84 class _QueuedOpCode(object):
85   """Encapsulates an opcode object.
86
87   @ivar log: holds the execution log and consists of tuples
88   of the form C{(log_serial, timestamp, level, message)}
89   @ivar input: the OpCode we encapsulate
90   @ivar status: the current status
91   @ivar result: the result of the LU execution
92   @ivar start_timestamp: timestamp for the start of the execution
93   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94   @ivar stop_timestamp: timestamp for the end of the execution
95
96   """
97   __slots__ = ["input", "status", "result", "log", "priority",
98                "start_timestamp", "exec_timestamp", "end_timestamp",
99                "__weakref__"]
100
101   def __init__(self, op):
102     """Constructor for the _QuededOpCode.
103
104     @type op: L{opcodes.OpCode}
105     @param op: the opcode we encapsulate
106
107     """
108     self.input = op
109     self.status = constants.OP_STATUS_QUEUED
110     self.result = None
111     self.log = []
112     self.start_timestamp = None
113     self.exec_timestamp = None
114     self.end_timestamp = None
115
116     # Get initial priority (it might change during the lifetime of this opcode)
117     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
118
119   @classmethod
120   def Restore(cls, state):
121     """Restore the _QueuedOpCode from the serialized form.
122
123     @type state: dict
124     @param state: the serialized state
125     @rtype: _QueuedOpCode
126     @return: a new _QueuedOpCode instance
127
128     """
129     obj = _QueuedOpCode.__new__(cls)
130     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
131     obj.status = state["status"]
132     obj.result = state["result"]
133     obj.log = state["log"]
134     obj.start_timestamp = state.get("start_timestamp", None)
135     obj.exec_timestamp = state.get("exec_timestamp", None)
136     obj.end_timestamp = state.get("end_timestamp", None)
137     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
138     return obj
139
140   def Serialize(self):
141     """Serializes this _QueuedOpCode.
142
143     @rtype: dict
144     @return: the dictionary holding the serialized state
145
146     """
147     return {
148       "input": self.input.__getstate__(),
149       "status": self.status,
150       "result": self.result,
151       "log": self.log,
152       "start_timestamp": self.start_timestamp,
153       "exec_timestamp": self.exec_timestamp,
154       "end_timestamp": self.end_timestamp,
155       "priority": self.priority,
156       }
157
158
159 class _QueuedJob(object):
160   """In-memory job representation.
161
162   This is what we use to track the user-submitted jobs. Locking must
163   be taken care of by users of this class.
164
165   @type queue: L{JobQueue}
166   @ivar queue: the parent queue
167   @ivar id: the job ID
168   @type ops: list
169   @ivar ops: the list of _QueuedOpCode that constitute the job
170   @type log_serial: int
171   @ivar log_serial: holds the index for the next log entry
172   @ivar received_timestamp: the timestamp for when the job was received
173   @ivar start_timestmap: the timestamp for start of execution
174   @ivar end_timestamp: the timestamp for end of execution
175
176   """
177   # pylint: disable-msg=W0212
178   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179                "received_timestamp", "start_timestamp", "end_timestamp",
180                "__weakref__"]
181
182   def __init__(self, queue, job_id, ops):
183     """Constructor for the _QueuedJob.
184
185     @type queue: L{JobQueue}
186     @param queue: our parent queue
187     @type job_id: job_id
188     @param job_id: our job id
189     @type ops: list
190     @param ops: the list of opcodes we hold, which will be encapsulated
191         in _QueuedOpCodes
192
193     """
194     if not ops:
195       raise errors.GenericError("A job needs at least one opcode")
196
197     self.queue = queue
198     self.id = job_id
199     self.ops = [_QueuedOpCode(op) for op in ops]
200     self.log_serial = 0
201     self.received_timestamp = TimeStampNow()
202     self.start_timestamp = None
203     self.end_timestamp = None
204
205     self._InitInMemory(self)
206
207   @staticmethod
208   def _InitInMemory(obj):
209     """Initializes in-memory variables.
210
211     """
212     obj.ops_iter = None
213     obj.cur_opctx = None
214
215   def __repr__(self):
216     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
217               "id=%s" % self.id,
218               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
219
220     return "<%s at %#x>" % (" ".join(status), id(self))
221
222   @classmethod
223   def Restore(cls, queue, state):
224     """Restore a _QueuedJob from serialized state:
225
226     @type queue: L{JobQueue}
227     @param queue: to which queue the restored job belongs
228     @type state: dict
229     @param state: the serialized state
230     @rtype: _JobQueue
231     @return: the restored _JobQueue instance
232
233     """
234     obj = _QueuedJob.__new__(cls)
235     obj.queue = queue
236     obj.id = state["id"]
237     obj.received_timestamp = state.get("received_timestamp", None)
238     obj.start_timestamp = state.get("start_timestamp", None)
239     obj.end_timestamp = state.get("end_timestamp", None)
240
241     obj.ops = []
242     obj.log_serial = 0
243     for op_state in state["ops"]:
244       op = _QueuedOpCode.Restore(op_state)
245       for log_entry in op.log:
246         obj.log_serial = max(obj.log_serial, log_entry[0])
247       obj.ops.append(op)
248
249     cls._InitInMemory(obj)
250
251     return obj
252
253   def Serialize(self):
254     """Serialize the _JobQueue instance.
255
256     @rtype: dict
257     @return: the serialized state
258
259     """
260     return {
261       "id": self.id,
262       "ops": [op.Serialize() for op in self.ops],
263       "start_timestamp": self.start_timestamp,
264       "end_timestamp": self.end_timestamp,
265       "received_timestamp": self.received_timestamp,
266       }
267
268   def CalcStatus(self):
269     """Compute the status of this job.
270
271     This function iterates over all the _QueuedOpCodes in the job and
272     based on their status, computes the job status.
273
274     The algorithm is:
275       - if we find a cancelled, or finished with error, the job
276         status will be the same
277       - otherwise, the last opcode with the status one of:
278           - waitlock
279           - canceling
280           - running
281
282         will determine the job status
283
284       - otherwise, it means either all opcodes are queued, or success,
285         and the job status will be the same
286
287     @return: the job status
288
289     """
290     status = constants.JOB_STATUS_QUEUED
291
292     all_success = True
293     for op in self.ops:
294       if op.status == constants.OP_STATUS_SUCCESS:
295         continue
296
297       all_success = False
298
299       if op.status == constants.OP_STATUS_QUEUED:
300         pass
301       elif op.status == constants.OP_STATUS_WAITLOCK:
302         status = constants.JOB_STATUS_WAITLOCK
303       elif op.status == constants.OP_STATUS_RUNNING:
304         status = constants.JOB_STATUS_RUNNING
305       elif op.status == constants.OP_STATUS_CANCELING:
306         status = constants.JOB_STATUS_CANCELING
307         break
308       elif op.status == constants.OP_STATUS_ERROR:
309         status = constants.JOB_STATUS_ERROR
310         # The whole job fails if one opcode failed
311         break
312       elif op.status == constants.OP_STATUS_CANCELED:
313         status = constants.OP_STATUS_CANCELED
314         break
315
316     if all_success:
317       status = constants.JOB_STATUS_SUCCESS
318
319     return status
320
321   def CalcPriority(self):
322     """Gets the current priority for this job.
323
324     Only unfinished opcodes are considered. When all are done, the default
325     priority is used.
326
327     @rtype: int
328
329     """
330     priorities = [op.priority for op in self.ops
331                   if op.status not in constants.OPS_FINALIZED]
332
333     if not priorities:
334       # All opcodes are done, assume default priority
335       return constants.OP_PRIO_DEFAULT
336
337     return min(priorities)
338
339   def GetLogEntries(self, newer_than):
340     """Selectively returns the log entries.
341
342     @type newer_than: None or int
343     @param newer_than: if this is None, return all log entries,
344         otherwise return only the log entries with serial higher
345         than this value
346     @rtype: list
347     @return: the list of the log entries selected
348
349     """
350     if newer_than is None:
351       serial = -1
352     else:
353       serial = newer_than
354
355     entries = []
356     for op in self.ops:
357       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
358
359     return entries
360
361   def GetInfo(self, fields):
362     """Returns information about a job.
363
364     @type fields: list
365     @param fields: names of fields to return
366     @rtype: list
367     @return: list with one element for each field
368     @raise errors.OpExecError: when an invalid field
369         has been passed
370
371     """
372     row = []
373     for fname in fields:
374       if fname == "id":
375         row.append(self.id)
376       elif fname == "status":
377         row.append(self.CalcStatus())
378       elif fname == "priority":
379         row.append(self.CalcPriority())
380       elif fname == "ops":
381         row.append([op.input.__getstate__() for op in self.ops])
382       elif fname == "opresult":
383         row.append([op.result for op in self.ops])
384       elif fname == "opstatus":
385         row.append([op.status for op in self.ops])
386       elif fname == "oplog":
387         row.append([op.log for op in self.ops])
388       elif fname == "opstart":
389         row.append([op.start_timestamp for op in self.ops])
390       elif fname == "opexec":
391         row.append([op.exec_timestamp for op in self.ops])
392       elif fname == "opend":
393         row.append([op.end_timestamp for op in self.ops])
394       elif fname == "oppriority":
395         row.append([op.priority for op in self.ops])
396       elif fname == "received_ts":
397         row.append(self.received_timestamp)
398       elif fname == "start_ts":
399         row.append(self.start_timestamp)
400       elif fname == "end_ts":
401         row.append(self.end_timestamp)
402       elif fname == "summary":
403         row.append([op.input.Summary() for op in self.ops])
404       else:
405         raise errors.OpExecError("Invalid self query field '%s'" % fname)
406     return row
407
408   def MarkUnfinishedOps(self, status, result):
409     """Mark unfinished opcodes with a given status and result.
410
411     This is an utility function for marking all running or waiting to
412     be run opcodes with a given status. Opcodes which are already
413     finalised are not changed.
414
415     @param status: a given opcode status
416     @param result: the opcode result
417
418     """
419     not_marked = True
420     for op in self.ops:
421       if op.status in constants.OPS_FINALIZED:
422         assert not_marked, "Finalized opcodes found after non-finalized ones"
423         continue
424       op.status = status
425       op.result = result
426       not_marked = False
427
428   def Cancel(self):
429     """Marks job as canceled/-ing if possible.
430
431     @rtype: tuple; (bool, string)
432     @return: Boolean describing whether job was successfully canceled or marked
433       as canceling and a text message
434
435     """
436     status = self.CalcStatus()
437
438     if status == constants.JOB_STATUS_QUEUED:
439       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
440                              "Job canceled by request")
441       return (True, "Job %s canceled" % self.id)
442
443     elif status == constants.JOB_STATUS_WAITLOCK:
444       # The worker will notice the new status and cancel the job
445       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
446       return (True, "Job %s will be canceled" % self.id)
447
448     else:
449       logging.debug("Job %s is no longer waiting in the queue", self.id)
450       return (False, "Job %s is no longer waiting in the queue" % self.id)
451
452
453 class _OpExecCallbacks(mcpu.OpExecCbBase):
454   def __init__(self, queue, job, op):
455     """Initializes this class.
456
457     @type queue: L{JobQueue}
458     @param queue: Job queue
459     @type job: L{_QueuedJob}
460     @param job: Job object
461     @type op: L{_QueuedOpCode}
462     @param op: OpCode
463
464     """
465     assert queue, "Queue is missing"
466     assert job, "Job is missing"
467     assert op, "Opcode is missing"
468
469     self._queue = queue
470     self._job = job
471     self._op = op
472
473   def _CheckCancel(self):
474     """Raises an exception to cancel the job if asked to.
475
476     """
477     # Cancel here if we were asked to
478     if self._op.status == constants.OP_STATUS_CANCELING:
479       logging.debug("Canceling opcode")
480       raise CancelJob()
481
482   @locking.ssynchronized(_QUEUE, shared=1)
483   def NotifyStart(self):
484     """Mark the opcode as running, not lock-waiting.
485
486     This is called from the mcpu code as a notifier function, when the LU is
487     finally about to start the Exec() method. Of course, to have end-user
488     visible results, the opcode must be initially (before calling into
489     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
490
491     """
492     assert self._op in self._job.ops
493     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
494                                constants.OP_STATUS_CANCELING)
495
496     # Cancel here if we were asked to
497     self._CheckCancel()
498
499     logging.debug("Opcode is now running")
500
501     self._op.status = constants.OP_STATUS_RUNNING
502     self._op.exec_timestamp = TimeStampNow()
503
504     # And finally replicate the job status
505     self._queue.UpdateJobUnlocked(self._job)
506
507   @locking.ssynchronized(_QUEUE, shared=1)
508   def _AppendFeedback(self, timestamp, log_type, log_msg):
509     """Internal feedback append function, with locks
510
511     """
512     self._job.log_serial += 1
513     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
514     self._queue.UpdateJobUnlocked(self._job, replicate=False)
515
516   def Feedback(self, *args):
517     """Append a log entry.
518
519     """
520     assert len(args) < 3
521
522     if len(args) == 1:
523       log_type = constants.ELOG_MESSAGE
524       log_msg = args[0]
525     else:
526       (log_type, log_msg) = args
527
528     # The time is split to make serialization easier and not lose
529     # precision.
530     timestamp = utils.SplitTime(time.time())
531     self._AppendFeedback(timestamp, log_type, log_msg)
532
533   def CheckCancel(self):
534     """Check whether job has been cancelled.
535
536     """
537     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
538                                constants.OP_STATUS_CANCELING)
539
540     # Cancel here if we were asked to
541     self._CheckCancel()
542
543
544 class _JobChangesChecker(object):
545   def __init__(self, fields, prev_job_info, prev_log_serial):
546     """Initializes this class.
547
548     @type fields: list of strings
549     @param fields: Fields requested by LUXI client
550     @type prev_job_info: string
551     @param prev_job_info: previous job info, as passed by the LUXI client
552     @type prev_log_serial: string
553     @param prev_log_serial: previous job serial, as passed by the LUXI client
554
555     """
556     self._fields = fields
557     self._prev_job_info = prev_job_info
558     self._prev_log_serial = prev_log_serial
559
560   def __call__(self, job):
561     """Checks whether job has changed.
562
563     @type job: L{_QueuedJob}
564     @param job: Job object
565
566     """
567     status = job.CalcStatus()
568     job_info = job.GetInfo(self._fields)
569     log_entries = job.GetLogEntries(self._prev_log_serial)
570
571     # Serializing and deserializing data can cause type changes (e.g. from
572     # tuple to list) or precision loss. We're doing it here so that we get
573     # the same modifications as the data received from the client. Without
574     # this, the comparison afterwards might fail without the data being
575     # significantly different.
576     # TODO: we just deserialized from disk, investigate how to make sure that
577     # the job info and log entries are compatible to avoid this further step.
578     # TODO: Doing something like in testutils.py:UnifyValueType might be more
579     # efficient, though floats will be tricky
580     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
581     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
582
583     # Don't even try to wait if the job is no longer running, there will be
584     # no changes.
585     if (status not in (constants.JOB_STATUS_QUEUED,
586                        constants.JOB_STATUS_RUNNING,
587                        constants.JOB_STATUS_WAITLOCK) or
588         job_info != self._prev_job_info or
589         (log_entries and self._prev_log_serial != log_entries[0][0])):
590       logging.debug("Job %s changed", job.id)
591       return (job_info, log_entries)
592
593     return None
594
595
596 class _JobFileChangesWaiter(object):
597   def __init__(self, filename):
598     """Initializes this class.
599
600     @type filename: string
601     @param filename: Path to job file
602     @raises errors.InotifyError: if the notifier cannot be setup
603
604     """
605     self._wm = pyinotify.WatchManager()
606     self._inotify_handler = \
607       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
608     self._notifier = \
609       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
610     try:
611       self._inotify_handler.enable()
612     except Exception:
613       # pyinotify doesn't close file descriptors automatically
614       self._notifier.stop()
615       raise
616
617   def _OnInotify(self, notifier_enabled):
618     """Callback for inotify.
619
620     """
621     if not notifier_enabled:
622       self._inotify_handler.enable()
623
624   def Wait(self, timeout):
625     """Waits for the job file to change.
626
627     @type timeout: float
628     @param timeout: Timeout in seconds
629     @return: Whether there have been events
630
631     """
632     assert timeout >= 0
633     have_events = self._notifier.check_events(timeout * 1000)
634     if have_events:
635       self._notifier.read_events()
636     self._notifier.process_events()
637     return have_events
638
639   def Close(self):
640     """Closes underlying notifier and its file descriptor.
641
642     """
643     self._notifier.stop()
644
645
646 class _JobChangesWaiter(object):
647   def __init__(self, filename):
648     """Initializes this class.
649
650     @type filename: string
651     @param filename: Path to job file
652
653     """
654     self._filewaiter = None
655     self._filename = filename
656
657   def Wait(self, timeout):
658     """Waits for a job to change.
659
660     @type timeout: float
661     @param timeout: Timeout in seconds
662     @return: Whether there have been events
663
664     """
665     if self._filewaiter:
666       return self._filewaiter.Wait(timeout)
667
668     # Lazy setup: Avoid inotify setup cost when job file has already changed.
669     # If this point is reached, return immediately and let caller check the job
670     # file again in case there were changes since the last check. This avoids a
671     # race condition.
672     self._filewaiter = _JobFileChangesWaiter(self._filename)
673
674     return True
675
676   def Close(self):
677     """Closes underlying waiter.
678
679     """
680     if self._filewaiter:
681       self._filewaiter.Close()
682
683
684 class _WaitForJobChangesHelper(object):
685   """Helper class using inotify to wait for changes in a job file.
686
687   This class takes a previous job status and serial, and alerts the client when
688   the current job status has changed.
689
690   """
691   @staticmethod
692   def _CheckForChanges(job_load_fn, check_fn):
693     job = job_load_fn()
694     if not job:
695       raise errors.JobLost()
696
697     result = check_fn(job)
698     if result is None:
699       raise utils.RetryAgain()
700
701     return result
702
703   def __call__(self, filename, job_load_fn,
704                fields, prev_job_info, prev_log_serial, timeout):
705     """Waits for changes on a job.
706
707     @type filename: string
708     @param filename: File on which to wait for changes
709     @type job_load_fn: callable
710     @param job_load_fn: Function to load job
711     @type fields: list of strings
712     @param fields: Which fields to check for changes
713     @type prev_job_info: list or None
714     @param prev_job_info: Last job information returned
715     @type prev_log_serial: int
716     @param prev_log_serial: Last job message serial number
717     @type timeout: float
718     @param timeout: maximum time to wait in seconds
719
720     """
721     try:
722       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
723       waiter = _JobChangesWaiter(filename)
724       try:
725         return utils.Retry(compat.partial(self._CheckForChanges,
726                                           job_load_fn, check_fn),
727                            utils.RETRY_REMAINING_TIME, timeout,
728                            wait_fn=waiter.Wait)
729       finally:
730         waiter.Close()
731     except (errors.InotifyError, errors.JobLost):
732       return None
733     except utils.RetryTimeout:
734       return constants.JOB_NOTCHANGED
735
736
737 def _EncodeOpError(err):
738   """Encodes an error which occurred while processing an opcode.
739
740   """
741   if isinstance(err, errors.GenericError):
742     to_encode = err
743   else:
744     to_encode = errors.OpExecError(str(err))
745
746   return errors.EncodeException(to_encode)
747
748
749 class _TimeoutStrategyWrapper:
750   def __init__(self, fn):
751     """Initializes this class.
752
753     """
754     self._fn = fn
755     self._next = None
756
757   def _Advance(self):
758     """Gets the next timeout if necessary.
759
760     """
761     if self._next is None:
762       self._next = self._fn()
763
764   def Peek(self):
765     """Returns the next timeout.
766
767     """
768     self._Advance()
769     return self._next
770
771   def Next(self):
772     """Returns the current timeout and advances the internal state.
773
774     """
775     self._Advance()
776     result = self._next
777     self._next = None
778     return result
779
780
781 class _OpExecContext:
782   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
783     """Initializes this class.
784
785     """
786     self.op = op
787     self.index = index
788     self.log_prefix = log_prefix
789     self.summary = op.input.Summary()
790
791     self._timeout_strategy_factory = timeout_strategy_factory
792     self._ResetTimeoutStrategy()
793
794   def _ResetTimeoutStrategy(self):
795     """Creates a new timeout strategy.
796
797     """
798     self._timeout_strategy = \
799       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
800
801   def CheckPriorityIncrease(self):
802     """Checks whether priority can and should be increased.
803
804     Called when locks couldn't be acquired.
805
806     """
807     op = self.op
808
809     # Exhausted all retries and next round should not use blocking acquire
810     # for locks?
811     if (self._timeout_strategy.Peek() is None and
812         op.priority > constants.OP_PRIO_HIGHEST):
813       logging.debug("Increasing priority")
814       op.priority -= 1
815       self._ResetTimeoutStrategy()
816       return True
817
818     return False
819
820   def GetNextLockTimeout(self):
821     """Returns the next lock acquire timeout.
822
823     """
824     return self._timeout_strategy.Next()
825
826
827 class _JobProcessor(object):
828   def __init__(self, queue, opexec_fn, job,
829                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
830     """Initializes this class.
831
832     """
833     self.queue = queue
834     self.opexec_fn = opexec_fn
835     self.job = job
836     self._timeout_strategy_factory = _timeout_strategy_factory
837
838   @staticmethod
839   def _FindNextOpcode(job, timeout_strategy_factory):
840     """Locates the next opcode to run.
841
842     @type job: L{_QueuedJob}
843     @param job: Job object
844     @param timeout_strategy_factory: Callable to create new timeout strategy
845
846     """
847     # Create some sort of a cache to speed up locating next opcode for future
848     # lookups
849     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
850     # pending and one for processed ops.
851     if job.ops_iter is None:
852       job.ops_iter = enumerate(job.ops)
853
854     # Find next opcode to run
855     while True:
856       try:
857         (idx, op) = job.ops_iter.next()
858       except StopIteration:
859         raise errors.ProgrammerError("Called for a finished job")
860
861       if op.status == constants.OP_STATUS_RUNNING:
862         # Found an opcode already marked as running
863         raise errors.ProgrammerError("Called for job marked as running")
864
865       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
866                              timeout_strategy_factory)
867
868       if op.status == constants.OP_STATUS_CANCELED:
869         # Cancelled jobs are handled by the caller
870         assert not compat.any(i.status != constants.OP_STATUS_CANCELED
871                               for i in job.ops[idx:])
872
873       elif op.status in constants.OPS_FINALIZED:
874         # This is a job that was partially completed before master daemon
875         # shutdown, so it can be expected that some opcodes are already
876         # completed successfully (if any did error out, then the whole job
877         # should have been aborted and not resubmitted for processing).
878         logging.info("%s: opcode %s already processed, skipping",
879                      opctx.log_prefix, opctx.summary)
880         continue
881
882       return opctx
883
884   @staticmethod
885   def _MarkWaitlock(job, op):
886     """Marks an opcode as waiting for locks.
887
888     The job's start timestamp is also set if necessary.
889
890     @type job: L{_QueuedJob}
891     @param job: Job object
892     @type op: L{_QueuedOpCode}
893     @param op: Opcode object
894
895     """
896     assert op in job.ops
897     assert op.status in (constants.OP_STATUS_QUEUED,
898                          constants.OP_STATUS_WAITLOCK)
899
900     update = False
901
902     op.result = None
903
904     if op.status == constants.OP_STATUS_QUEUED:
905       op.status = constants.OP_STATUS_WAITLOCK
906       update = True
907
908     if op.start_timestamp is None:
909       op.start_timestamp = TimeStampNow()
910       update = True
911
912     if job.start_timestamp is None:
913       job.start_timestamp = op.start_timestamp
914       update = True
915
916     assert op.status == constants.OP_STATUS_WAITLOCK
917
918     return update
919
920   def _ExecOpCodeUnlocked(self, opctx):
921     """Processes one opcode and returns the result.
922
923     """
924     op = opctx.op
925
926     assert op.status == constants.OP_STATUS_WAITLOCK
927
928     timeout = opctx.GetNextLockTimeout()
929
930     try:
931       # Make sure not to hold queue lock while calling ExecOpCode
932       result = self.opexec_fn(op.input,
933                               _OpExecCallbacks(self.queue, self.job, op),
934                               timeout=timeout, priority=op.priority)
935     except mcpu.LockAcquireTimeout:
936       assert timeout is not None, "Received timeout for blocking acquire"
937       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
938
939       assert op.status in (constants.OP_STATUS_WAITLOCK,
940                            constants.OP_STATUS_CANCELING)
941
942       # Was job cancelled while we were waiting for the lock?
943       if op.status == constants.OP_STATUS_CANCELING:
944         return (constants.OP_STATUS_CANCELING, None)
945
946       # Stay in waitlock while trying to re-acquire lock
947       return (constants.OP_STATUS_WAITLOCK, None)
948     except CancelJob:
949       logging.exception("%s: Canceling job", opctx.log_prefix)
950       assert op.status == constants.OP_STATUS_CANCELING
951       return (constants.OP_STATUS_CANCELING, None)
952     except Exception, err: # pylint: disable-msg=W0703
953       logging.exception("%s: Caught exception in %s",
954                         opctx.log_prefix, opctx.summary)
955       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
956     else:
957       logging.debug("%s: %s successful",
958                     opctx.log_prefix, opctx.summary)
959       return (constants.OP_STATUS_SUCCESS, result)
960
961   def __call__(self, _nextop_fn=None):
962     """Continues execution of a job.
963
964     @param _nextop_fn: Callback function for tests
965     @rtype: bool
966     @return: True if job is finished, False if processor needs to be called
967              again
968
969     """
970     queue = self.queue
971     job = self.job
972
973     logging.debug("Processing job %s", job.id)
974
975     queue.acquire(shared=1)
976     try:
977       opcount = len(job.ops)
978
979       # Is a previous opcode still pending?
980       if job.cur_opctx:
981         opctx = job.cur_opctx
982         job.cur_opctx = None
983       else:
984         if __debug__ and _nextop_fn:
985           _nextop_fn()
986         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
987
988       op = opctx.op
989
990       # Consistency check
991       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
992                                      constants.OP_STATUS_CANCELING,
993                                      constants.OP_STATUS_CANCELED)
994                         for i in job.ops[opctx.index + 1:])
995
996       assert op.status in (constants.OP_STATUS_QUEUED,
997                            constants.OP_STATUS_WAITLOCK,
998                            constants.OP_STATUS_CANCELING,
999                            constants.OP_STATUS_CANCELED)
1000
1001       assert (op.priority <= constants.OP_PRIO_LOWEST and
1002               op.priority >= constants.OP_PRIO_HIGHEST)
1003
1004       if op.status not in (constants.OP_STATUS_CANCELING,
1005                            constants.OP_STATUS_CANCELED):
1006         assert op.status in (constants.OP_STATUS_QUEUED,
1007                              constants.OP_STATUS_WAITLOCK)
1008
1009         # Prepare to start opcode
1010         if self._MarkWaitlock(job, op):
1011           # Write to disk
1012           queue.UpdateJobUnlocked(job)
1013
1014         assert op.status == constants.OP_STATUS_WAITLOCK
1015         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1016         assert job.start_timestamp and op.start_timestamp
1017
1018         logging.info("%s: opcode %s waiting for locks",
1019                      opctx.log_prefix, opctx.summary)
1020
1021         queue.release()
1022         try:
1023           (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1024         finally:
1025           queue.acquire(shared=1)
1026
1027         op.status = op_status
1028         op.result = op_result
1029
1030         if op.status == constants.OP_STATUS_WAITLOCK:
1031           # Couldn't get locks in time
1032           assert not op.end_timestamp
1033         else:
1034           # Finalize opcode
1035           op.end_timestamp = TimeStampNow()
1036
1037           if op.status == constants.OP_STATUS_CANCELING:
1038             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1039                                   for i in job.ops[opctx.index:])
1040           else:
1041             assert op.status in constants.OPS_FINALIZED
1042
1043       if op.status == constants.OP_STATUS_WAITLOCK:
1044         finalize = False
1045
1046         if opctx.CheckPriorityIncrease():
1047           # Priority was changed, need to update on-disk file
1048           queue.UpdateJobUnlocked(job)
1049
1050         # Keep around for another round
1051         job.cur_opctx = opctx
1052
1053         assert (op.priority <= constants.OP_PRIO_LOWEST and
1054                 op.priority >= constants.OP_PRIO_HIGHEST)
1055
1056         # In no case must the status be finalized here
1057         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1058
1059       else:
1060         # Ensure all opcodes so far have been successful
1061         assert (opctx.index == 0 or
1062                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1063                            for i in job.ops[:opctx.index]))
1064
1065         # Reset context
1066         job.cur_opctx = None
1067
1068         if op.status == constants.OP_STATUS_SUCCESS:
1069           finalize = False
1070
1071         elif op.status == constants.OP_STATUS_ERROR:
1072           # Ensure failed opcode has an exception as its result
1073           assert errors.GetEncodedError(job.ops[opctx.index].result)
1074
1075           to_encode = errors.OpExecError("Preceding opcode failed")
1076           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1077                                 _EncodeOpError(to_encode))
1078           finalize = True
1079
1080           # Consistency check
1081           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1082                             errors.GetEncodedError(i.result)
1083                             for i in job.ops[opctx.index:])
1084
1085         elif op.status == constants.OP_STATUS_CANCELING:
1086           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1087                                 "Job canceled by request")
1088           finalize = True
1089
1090         elif op.status == constants.OP_STATUS_CANCELED:
1091           finalize = True
1092
1093         else:
1094           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1095
1096         # Finalizing or last opcode?
1097         if finalize or opctx.index == (opcount - 1):
1098           # All opcodes have been run, finalize job
1099           job.end_timestamp = TimeStampNow()
1100
1101         # Write to disk. If the job status is final, this is the final write
1102         # allowed. Once the file has been written, it can be archived anytime.
1103         queue.UpdateJobUnlocked(job)
1104
1105         if finalize or opctx.index == (opcount - 1):
1106           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1107           return True
1108
1109       return False
1110     finally:
1111       queue.release()
1112
1113
1114 class _JobQueueWorker(workerpool.BaseWorker):
1115   """The actual job workers.
1116
1117   """
1118   def RunTask(self, job): # pylint: disable-msg=W0221
1119     """Job executor.
1120
1121     This functions processes a job. It is closely tied to the L{_QueuedJob} and
1122     L{_QueuedOpCode} classes.
1123
1124     @type job: L{_QueuedJob}
1125     @param job: the job to be processed
1126
1127     """
1128     queue = job.queue
1129     assert queue == self.pool.queue
1130
1131     self.SetTaskName("Job%s" % job.id)
1132
1133     proc = mcpu.Processor(queue.context, job.id)
1134
1135     if not _JobProcessor(queue, proc.ExecOpCode, job)():
1136       # Schedule again
1137       raise workerpool.DeferTask(priority=job.CalcPriority())
1138
1139
1140 class _JobQueueWorkerPool(workerpool.WorkerPool):
1141   """Simple class implementing a job-processing workerpool.
1142
1143   """
1144   def __init__(self, queue):
1145     super(_JobQueueWorkerPool, self).__init__("JobQueue",
1146                                               JOBQUEUE_THREADS,
1147                                               _JobQueueWorker)
1148     self.queue = queue
1149
1150
1151 def _RequireOpenQueue(fn):
1152   """Decorator for "public" functions.
1153
1154   This function should be used for all 'public' functions. That is,
1155   functions usually called from other classes. Note that this should
1156   be applied only to methods (not plain functions), since it expects
1157   that the decorated function is called with a first argument that has
1158   a '_queue_filelock' argument.
1159
1160   @warning: Use this decorator only after locking.ssynchronized
1161
1162   Example::
1163     @locking.ssynchronized(_LOCK)
1164     @_RequireOpenQueue
1165     def Example(self):
1166       pass
1167
1168   """
1169   def wrapper(self, *args, **kwargs):
1170     # pylint: disable-msg=W0212
1171     assert self._queue_filelock is not None, "Queue should be open"
1172     return fn(self, *args, **kwargs)
1173   return wrapper
1174
1175
1176 class JobQueue(object):
1177   """Queue used to manage the jobs.
1178
1179   @cvar _RE_JOB_FILE: regex matching the valid job file names
1180
1181   """
1182   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1183
1184   def __init__(self, context):
1185     """Constructor for JobQueue.
1186
1187     The constructor will initialize the job queue object and then
1188     start loading the current jobs from disk, either for starting them
1189     (if they were queue) or for aborting them (if they were already
1190     running).
1191
1192     @type context: GanetiContext
1193     @param context: the context object for access to the configuration
1194         data and other ganeti objects
1195
1196     """
1197     self.context = context
1198     self._memcache = weakref.WeakValueDictionary()
1199     self._my_hostname = netutils.Hostname.GetSysName()
1200
1201     # The Big JobQueue lock. If a code block or method acquires it in shared
1202     # mode safe it must guarantee concurrency with all the code acquiring it in
1203     # shared mode, including itself. In order not to acquire it at all
1204     # concurrency must be guaranteed with all code acquiring it in shared mode
1205     # and all code acquiring it exclusively.
1206     self._lock = locking.SharedLock("JobQueue")
1207
1208     self.acquire = self._lock.acquire
1209     self.release = self._lock.release
1210
1211     # Initialize the queue, and acquire the filelock.
1212     # This ensures no other process is working on the job queue.
1213     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1214
1215     # Read serial file
1216     self._last_serial = jstore.ReadSerial()
1217     assert self._last_serial is not None, ("Serial file was modified between"
1218                                            " check in jstore and here")
1219
1220     # Get initial list of nodes
1221     self._nodes = dict((n.name, n.primary_ip)
1222                        for n in self.context.cfg.GetAllNodesInfo().values()
1223                        if n.master_candidate)
1224
1225     # Remove master node
1226     self._nodes.pop(self._my_hostname, None)
1227
1228     # TODO: Check consistency across nodes
1229
1230     self._queue_size = 0
1231     self._UpdateQueueSizeUnlocked()
1232     self._drained = jstore.CheckDrainFlag()
1233
1234     # Setup worker pool
1235     self._wpool = _JobQueueWorkerPool(self)
1236     try:
1237       self._InspectQueue()
1238     except:
1239       self._wpool.TerminateWorkers()
1240       raise
1241
1242   @locking.ssynchronized(_LOCK)
1243   @_RequireOpenQueue
1244   def _InspectQueue(self):
1245     """Loads the whole job queue and resumes unfinished jobs.
1246
1247     This function needs the lock here because WorkerPool.AddTask() may start a
1248     job while we're still doing our work.
1249
1250     """
1251     logging.info("Inspecting job queue")
1252
1253     restartjobs = []
1254
1255     all_job_ids = self._GetJobIDsUnlocked()
1256     jobs_count = len(all_job_ids)
1257     lastinfo = time.time()
1258     for idx, job_id in enumerate(all_job_ids):
1259       # Give an update every 1000 jobs or 10 seconds
1260       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1261           idx == (jobs_count - 1)):
1262         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1263                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1264         lastinfo = time.time()
1265
1266       job = self._LoadJobUnlocked(job_id)
1267
1268       # a failure in loading the job can cause 'None' to be returned
1269       if job is None:
1270         continue
1271
1272       status = job.CalcStatus()
1273
1274       if status == constants.JOB_STATUS_QUEUED:
1275         restartjobs.append(job)
1276
1277       elif status in (constants.JOB_STATUS_RUNNING,
1278                       constants.JOB_STATUS_WAITLOCK,
1279                       constants.JOB_STATUS_CANCELING):
1280         logging.warning("Unfinished job %s found: %s", job.id, job)
1281
1282         if status == constants.JOB_STATUS_WAITLOCK:
1283           # Restart job
1284           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1285           restartjobs.append(job)
1286         else:
1287           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1288                                 "Unclean master daemon shutdown")
1289
1290         self.UpdateJobUnlocked(job)
1291
1292     if restartjobs:
1293       logging.info("Restarting %s jobs", len(restartjobs))
1294       self._EnqueueJobs(restartjobs)
1295
1296     logging.info("Job queue inspection finished")
1297
1298   @locking.ssynchronized(_LOCK)
1299   @_RequireOpenQueue
1300   def AddNode(self, node):
1301     """Register a new node with the queue.
1302
1303     @type node: L{objects.Node}
1304     @param node: the node object to be added
1305
1306     """
1307     node_name = node.name
1308     assert node_name != self._my_hostname
1309
1310     # Clean queue directory on added node
1311     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1312     msg = result.fail_msg
1313     if msg:
1314       logging.warning("Cannot cleanup queue directory on node %s: %s",
1315                       node_name, msg)
1316
1317     if not node.master_candidate:
1318       # remove if existing, ignoring errors
1319       self._nodes.pop(node_name, None)
1320       # and skip the replication of the job ids
1321       return
1322
1323     # Upload the whole queue excluding archived jobs
1324     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1325
1326     # Upload current serial file
1327     files.append(constants.JOB_QUEUE_SERIAL_FILE)
1328
1329     for file_name in files:
1330       # Read file content
1331       content = utils.ReadFile(file_name)
1332
1333       result = rpc.RpcRunner.call_jobqueue_update([node_name],
1334                                                   [node.primary_ip],
1335                                                   file_name, content)
1336       msg = result[node_name].fail_msg
1337       if msg:
1338         logging.error("Failed to upload file %s to node %s: %s",
1339                       file_name, node_name, msg)
1340
1341     self._nodes[node_name] = node.primary_ip
1342
1343   @locking.ssynchronized(_LOCK)
1344   @_RequireOpenQueue
1345   def RemoveNode(self, node_name):
1346     """Callback called when removing nodes from the cluster.
1347
1348     @type node_name: str
1349     @param node_name: the name of the node to remove
1350
1351     """
1352     self._nodes.pop(node_name, None)
1353
1354   @staticmethod
1355   def _CheckRpcResult(result, nodes, failmsg):
1356     """Verifies the status of an RPC call.
1357
1358     Since we aim to keep consistency should this node (the current
1359     master) fail, we will log errors if our rpc fail, and especially
1360     log the case when more than half of the nodes fails.
1361
1362     @param result: the data as returned from the rpc call
1363     @type nodes: list
1364     @param nodes: the list of nodes we made the call to
1365     @type failmsg: str
1366     @param failmsg: the identifier to be used for logging
1367
1368     """
1369     failed = []
1370     success = []
1371
1372     for node in nodes:
1373       msg = result[node].fail_msg
1374       if msg:
1375         failed.append(node)
1376         logging.error("RPC call %s (%s) failed on node %s: %s",
1377                       result[node].call, failmsg, node, msg)
1378       else:
1379         success.append(node)
1380
1381     # +1 for the master node
1382     if (len(success) + 1) < len(failed):
1383       # TODO: Handle failing nodes
1384       logging.error("More than half of the nodes failed")
1385
1386   def _GetNodeIp(self):
1387     """Helper for returning the node name/ip list.
1388
1389     @rtype: (list, list)
1390     @return: a tuple of two lists, the first one with the node
1391         names and the second one with the node addresses
1392
1393     """
1394     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1395     name_list = self._nodes.keys()
1396     addr_list = [self._nodes[name] for name in name_list]
1397     return name_list, addr_list
1398
1399   def _UpdateJobQueueFile(self, file_name, data, replicate):
1400     """Writes a file locally and then replicates it to all nodes.
1401
1402     This function will replace the contents of a file on the local
1403     node and then replicate it to all the other nodes we have.
1404
1405     @type file_name: str
1406     @param file_name: the path of the file to be replicated
1407     @type data: str
1408     @param data: the new contents of the file
1409     @type replicate: boolean
1410     @param replicate: whether to spread the changes to the remote nodes
1411
1412     """
1413     getents = runtime.GetEnts()
1414     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1415                     gid=getents.masterd_gid)
1416
1417     if replicate:
1418       names, addrs = self._GetNodeIp()
1419       result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1420       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1421
1422   def _RenameFilesUnlocked(self, rename):
1423     """Renames a file locally and then replicate the change.
1424
1425     This function will rename a file in the local queue directory
1426     and then replicate this rename to all the other nodes we have.
1427
1428     @type rename: list of (old, new)
1429     @param rename: List containing tuples mapping old to new names
1430
1431     """
1432     # Rename them locally
1433     for old, new in rename:
1434       utils.RenameFile(old, new, mkdir=True)
1435
1436     # ... and on all nodes
1437     names, addrs = self._GetNodeIp()
1438     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1439     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1440
1441   @staticmethod
1442   def _FormatJobID(job_id):
1443     """Convert a job ID to string format.
1444
1445     Currently this just does C{str(job_id)} after performing some
1446     checks, but if we want to change the job id format this will
1447     abstract this change.
1448
1449     @type job_id: int or long
1450     @param job_id: the numeric job id
1451     @rtype: str
1452     @return: the formatted job id
1453
1454     """
1455     if not isinstance(job_id, (int, long)):
1456       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1457     if job_id < 0:
1458       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1459
1460     return str(job_id)
1461
1462   @classmethod
1463   def _GetArchiveDirectory(cls, job_id):
1464     """Returns the archive directory for a job.
1465
1466     @type job_id: str
1467     @param job_id: Job identifier
1468     @rtype: str
1469     @return: Directory name
1470
1471     """
1472     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1473
1474   def _NewSerialsUnlocked(self, count):
1475     """Generates a new job identifier.
1476
1477     Job identifiers are unique during the lifetime of a cluster.
1478
1479     @type count: integer
1480     @param count: how many serials to return
1481     @rtype: str
1482     @return: a string representing the job identifier.
1483
1484     """
1485     assert count > 0
1486     # New number
1487     serial = self._last_serial + count
1488
1489     # Write to file
1490     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1491                              "%s\n" % serial, True)
1492
1493     result = [self._FormatJobID(v)
1494               for v in range(self._last_serial, serial + 1)]
1495     # Keep it only if we were able to write the file
1496     self._last_serial = serial
1497
1498     return result
1499
1500   @staticmethod
1501   def _GetJobPath(job_id):
1502     """Returns the job file for a given job id.
1503
1504     @type job_id: str
1505     @param job_id: the job identifier
1506     @rtype: str
1507     @return: the path to the job file
1508
1509     """
1510     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1511
1512   @classmethod
1513   def _GetArchivedJobPath(cls, job_id):
1514     """Returns the archived job file for a give job id.
1515
1516     @type job_id: str
1517     @param job_id: the job identifier
1518     @rtype: str
1519     @return: the path to the archived job file
1520
1521     """
1522     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1523                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1524
1525   def _GetJobIDsUnlocked(self, sort=True):
1526     """Return all known job IDs.
1527
1528     The method only looks at disk because it's a requirement that all
1529     jobs are present on disk (so in the _memcache we don't have any
1530     extra IDs).
1531
1532     @type sort: boolean
1533     @param sort: perform sorting on the returned job ids
1534     @rtype: list
1535     @return: the list of job IDs
1536
1537     """
1538     jlist = []
1539     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1540       m = self._RE_JOB_FILE.match(filename)
1541       if m:
1542         jlist.append(m.group(1))
1543     if sort:
1544       jlist = utils.NiceSort(jlist)
1545     return jlist
1546
1547   def _LoadJobUnlocked(self, job_id):
1548     """Loads a job from the disk or memory.
1549
1550     Given a job id, this will return the cached job object if
1551     existing, or try to load the job from the disk. If loading from
1552     disk, it will also add the job to the cache.
1553
1554     @param job_id: the job id
1555     @rtype: L{_QueuedJob} or None
1556     @return: either None or the job object
1557
1558     """
1559     job = self._memcache.get(job_id, None)
1560     if job:
1561       logging.debug("Found job %s in memcache", job_id)
1562       return job
1563
1564     try:
1565       job = self._LoadJobFromDisk(job_id)
1566       if job is None:
1567         return job
1568     except errors.JobFileCorrupted:
1569       old_path = self._GetJobPath(job_id)
1570       new_path = self._GetArchivedJobPath(job_id)
1571       if old_path == new_path:
1572         # job already archived (future case)
1573         logging.exception("Can't parse job %s", job_id)
1574       else:
1575         # non-archived case
1576         logging.exception("Can't parse job %s, will archive.", job_id)
1577         self._RenameFilesUnlocked([(old_path, new_path)])
1578       return None
1579
1580     self._memcache[job_id] = job
1581     logging.debug("Added job %s to the cache", job_id)
1582     return job
1583
1584   def _LoadJobFromDisk(self, job_id):
1585     """Load the given job file from disk.
1586
1587     Given a job file, read, load and restore it in a _QueuedJob format.
1588
1589     @type job_id: string
1590     @param job_id: job identifier
1591     @rtype: L{_QueuedJob} or None
1592     @return: either None or the job object
1593
1594     """
1595     filepath = self._GetJobPath(job_id)
1596     logging.debug("Loading job from %s", filepath)
1597     try:
1598       raw_data = utils.ReadFile(filepath)
1599     except EnvironmentError, err:
1600       if err.errno in (errno.ENOENT, ):
1601         return None
1602       raise
1603
1604     try:
1605       data = serializer.LoadJson(raw_data)
1606       job = _QueuedJob.Restore(self, data)
1607     except Exception, err: # pylint: disable-msg=W0703
1608       raise errors.JobFileCorrupted(err)
1609
1610     return job
1611
1612   def SafeLoadJobFromDisk(self, job_id):
1613     """Load the given job file from disk.
1614
1615     Given a job file, read, load and restore it in a _QueuedJob format.
1616     In case of error reading the job, it gets returned as None, and the
1617     exception is logged.
1618
1619     @type job_id: string
1620     @param job_id: job identifier
1621     @rtype: L{_QueuedJob} or None
1622     @return: either None or the job object
1623
1624     """
1625     try:
1626       return self._LoadJobFromDisk(job_id)
1627     except (errors.JobFileCorrupted, EnvironmentError):
1628       logging.exception("Can't load/parse job %s", job_id)
1629       return None
1630
1631   def _UpdateQueueSizeUnlocked(self):
1632     """Update the queue size.
1633
1634     """
1635     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1636
1637   @locking.ssynchronized(_LOCK)
1638   @_RequireOpenQueue
1639   def SetDrainFlag(self, drain_flag):
1640     """Sets the drain flag for the queue.
1641
1642     @type drain_flag: boolean
1643     @param drain_flag: Whether to set or unset the drain flag
1644
1645     """
1646     jstore.SetDrainFlag(drain_flag)
1647
1648     self._drained = drain_flag
1649
1650     return True
1651
1652   @_RequireOpenQueue
1653   def _SubmitJobUnlocked(self, job_id, ops):
1654     """Create and store a new job.
1655
1656     This enters the job into our job queue and also puts it on the new
1657     queue, in order for it to be picked up by the queue processors.
1658
1659     @type job_id: job ID
1660     @param job_id: the job ID for the new job
1661     @type ops: list
1662     @param ops: The list of OpCodes that will become the new job.
1663     @rtype: L{_QueuedJob}
1664     @return: the job object to be queued
1665     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1666     @raise errors.JobQueueFull: if the job queue has too many jobs in it
1667     @raise errors.GenericError: If an opcode is not valid
1668
1669     """
1670     # Ok when sharing the big job queue lock, as the drain file is created when
1671     # the lock is exclusive.
1672     if self._drained:
1673       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1674
1675     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1676       raise errors.JobQueueFull()
1677
1678     job = _QueuedJob(self, job_id, ops)
1679
1680     # Check priority
1681     for idx, op in enumerate(job.ops):
1682       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1683         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1684         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1685                                   " are %s" % (idx, op.priority, allowed))
1686
1687     # Write to disk
1688     self.UpdateJobUnlocked(job)
1689
1690     self._queue_size += 1
1691
1692     logging.debug("Adding new job %s to the cache", job_id)
1693     self._memcache[job_id] = job
1694
1695     return job
1696
1697   @locking.ssynchronized(_LOCK)
1698   @_RequireOpenQueue
1699   def SubmitJob(self, ops):
1700     """Create and store a new job.
1701
1702     @see: L{_SubmitJobUnlocked}
1703
1704     """
1705     job_id = self._NewSerialsUnlocked(1)[0]
1706     self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1707     return job_id
1708
1709   @locking.ssynchronized(_LOCK)
1710   @_RequireOpenQueue
1711   def SubmitManyJobs(self, jobs):
1712     """Create and store multiple jobs.
1713
1714     @see: L{_SubmitJobUnlocked}
1715
1716     """
1717     results = []
1718     added_jobs = []
1719     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1720     for job_id, ops in zip(all_job_ids, jobs):
1721       try:
1722         added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1723         status = True
1724         data = job_id
1725       except errors.GenericError, err:
1726         data = str(err)
1727         status = False
1728       results.append((status, data))
1729
1730     self._EnqueueJobs(added_jobs)
1731
1732     return results
1733
1734   def _EnqueueJobs(self, jobs):
1735     """Helper function to add jobs to worker pool's queue.
1736
1737     @type jobs: list
1738     @param jobs: List of all jobs
1739
1740     """
1741     self._wpool.AddManyTasks([(job, ) for job in jobs],
1742                              priority=[job.CalcPriority() for job in jobs])
1743
1744   @_RequireOpenQueue
1745   def UpdateJobUnlocked(self, job, replicate=True):
1746     """Update a job's on disk storage.
1747
1748     After a job has been modified, this function needs to be called in
1749     order to write the changes to disk and replicate them to the other
1750     nodes.
1751
1752     @type job: L{_QueuedJob}
1753     @param job: the changed job
1754     @type replicate: boolean
1755     @param replicate: whether to replicate the change to remote nodes
1756
1757     """
1758     filename = self._GetJobPath(job.id)
1759     data = serializer.DumpJson(job.Serialize(), indent=False)
1760     logging.debug("Writing job %s to %s", job.id, filename)
1761     self._UpdateJobQueueFile(filename, data, replicate)
1762
1763   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1764                         timeout):
1765     """Waits for changes in a job.
1766
1767     @type job_id: string
1768     @param job_id: Job identifier
1769     @type fields: list of strings
1770     @param fields: Which fields to check for changes
1771     @type prev_job_info: list or None
1772     @param prev_job_info: Last job information returned
1773     @type prev_log_serial: int
1774     @param prev_log_serial: Last job message serial number
1775     @type timeout: float
1776     @param timeout: maximum time to wait in seconds
1777     @rtype: tuple (job info, log entries)
1778     @return: a tuple of the job information as required via
1779         the fields parameter, and the log entries as a list
1780
1781         if the job has not changed and the timeout has expired,
1782         we instead return a special value,
1783         L{constants.JOB_NOTCHANGED}, which should be interpreted
1784         as such by the clients
1785
1786     """
1787     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1788
1789     helper = _WaitForJobChangesHelper()
1790
1791     return helper(self._GetJobPath(job_id), load_fn,
1792                   fields, prev_job_info, prev_log_serial, timeout)
1793
1794   @locking.ssynchronized(_LOCK)
1795   @_RequireOpenQueue
1796   def CancelJob(self, job_id):
1797     """Cancels a job.
1798
1799     This will only succeed if the job has not started yet.
1800
1801     @type job_id: string
1802     @param job_id: job ID of job to be cancelled.
1803
1804     """
1805     logging.info("Cancelling job %s", job_id)
1806
1807     job = self._LoadJobUnlocked(job_id)
1808     if not job:
1809       logging.debug("Job %s not found", job_id)
1810       return (False, "Job %s not found" % job_id)
1811
1812     (success, msg) = job.Cancel()
1813
1814     if success:
1815       self.UpdateJobUnlocked(job)
1816
1817     return (success, msg)
1818
1819   @_RequireOpenQueue
1820   def _ArchiveJobsUnlocked(self, jobs):
1821     """Archives jobs.
1822
1823     @type jobs: list of L{_QueuedJob}
1824     @param jobs: Job objects
1825     @rtype: int
1826     @return: Number of archived jobs
1827
1828     """
1829     archive_jobs = []
1830     rename_files = []
1831     for job in jobs:
1832       if job.CalcStatus() not in constants.JOBS_FINALIZED:
1833         logging.debug("Job %s is not yet done", job.id)
1834         continue
1835
1836       archive_jobs.append(job)
1837
1838       old = self._GetJobPath(job.id)
1839       new = self._GetArchivedJobPath(job.id)
1840       rename_files.append((old, new))
1841
1842     # TODO: What if 1..n files fail to rename?
1843     self._RenameFilesUnlocked(rename_files)
1844
1845     logging.debug("Successfully archived job(s) %s",
1846                   utils.CommaJoin(job.id for job in archive_jobs))
1847
1848     # Since we haven't quite checked, above, if we succeeded or failed renaming
1849     # the files, we update the cached queue size from the filesystem. When we
1850     # get around to fix the TODO: above, we can use the number of actually
1851     # archived jobs to fix this.
1852     self._UpdateQueueSizeUnlocked()
1853     return len(archive_jobs)
1854
1855   @locking.ssynchronized(_LOCK)
1856   @_RequireOpenQueue
1857   def ArchiveJob(self, job_id):
1858     """Archives a job.
1859
1860     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1861
1862     @type job_id: string
1863     @param job_id: Job ID of job to be archived.
1864     @rtype: bool
1865     @return: Whether job was archived
1866
1867     """
1868     logging.info("Archiving job %s", job_id)
1869
1870     job = self._LoadJobUnlocked(job_id)
1871     if not job:
1872       logging.debug("Job %s not found", job_id)
1873       return False
1874
1875     return self._ArchiveJobsUnlocked([job]) == 1
1876
1877   @locking.ssynchronized(_LOCK)
1878   @_RequireOpenQueue
1879   def AutoArchiveJobs(self, age, timeout):
1880     """Archives all jobs based on age.
1881
1882     The method will archive all jobs which are older than the age
1883     parameter. For jobs that don't have an end timestamp, the start
1884     timestamp will be considered. The special '-1' age will cause
1885     archival of all jobs (that are not running or queued).
1886
1887     @type age: int
1888     @param age: the minimum age in seconds
1889
1890     """
1891     logging.info("Archiving jobs with age more than %s seconds", age)
1892
1893     now = time.time()
1894     end_time = now + timeout
1895     archived_count = 0
1896     last_touched = 0
1897
1898     all_job_ids = self._GetJobIDsUnlocked()
1899     pending = []
1900     for idx, job_id in enumerate(all_job_ids):
1901       last_touched = idx + 1
1902
1903       # Not optimal because jobs could be pending
1904       # TODO: Measure average duration for job archival and take number of
1905       # pending jobs into account.
1906       if time.time() > end_time:
1907         break
1908
1909       # Returns None if the job failed to load
1910       job = self._LoadJobUnlocked(job_id)
1911       if job:
1912         if job.end_timestamp is None:
1913           if job.start_timestamp is None:
1914             job_age = job.received_timestamp
1915           else:
1916             job_age = job.start_timestamp
1917         else:
1918           job_age = job.end_timestamp
1919
1920         if age == -1 or now - job_age[0] > age:
1921           pending.append(job)
1922
1923           # Archive 10 jobs at a time
1924           if len(pending) >= 10:
1925             archived_count += self._ArchiveJobsUnlocked(pending)
1926             pending = []
1927
1928     if pending:
1929       archived_count += self._ArchiveJobsUnlocked(pending)
1930
1931     return (archived_count, len(all_job_ids) - last_touched)
1932
1933   def QueryJobs(self, job_ids, fields):
1934     """Returns a list of jobs in queue.
1935
1936     @type job_ids: list
1937     @param job_ids: sequence of job identifiers or None for all
1938     @type fields: list
1939     @param fields: names of fields to return
1940     @rtype: list
1941     @return: list one element per job, each element being list with
1942         the requested fields
1943
1944     """
1945     jobs = []
1946     list_all = False
1947     if not job_ids:
1948       # Since files are added to/removed from the queue atomically, there's no
1949       # risk of getting the job ids in an inconsistent state.
1950       job_ids = self._GetJobIDsUnlocked()
1951       list_all = True
1952
1953     for job_id in job_ids:
1954       job = self.SafeLoadJobFromDisk(job_id)
1955       if job is not None:
1956         jobs.append(job.GetInfo(fields))
1957       elif not list_all:
1958         jobs.append(None)
1959
1960     return jobs
1961
1962   @locking.ssynchronized(_LOCK)
1963   @_RequireOpenQueue
1964   def Shutdown(self):
1965     """Stops the job queue.
1966
1967     This shutdowns all the worker threads an closes the queue.
1968
1969     """
1970     self._wpool.TerminateWorkers()
1971
1972     self._queue_filelock.Close()
1973     self._queue_filelock = None