jqueue: Fix potential race condition when cancelling queued jobs
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import errno
35 import re
36 import time
37 import weakref
38
39 try:
40   # pylint: disable-msg=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59
60
61 JOBQUEUE_THREADS = 25
62 JOBS_PER_ARCHIVE_DIRECTORY = 10000
63
64 # member lock names to be passed to @ssynchronized decorator
65 _LOCK = "_lock"
66 _QUEUE = "_queue"
67
68
69 class CancelJob(Exception):
70   """Special exception to cancel a job.
71
72   """
73
74
75 def TimeStampNow():
76   """Returns the current timestamp.
77
78   @rtype: tuple
79   @return: the current time in the (seconds, microseconds) format
80
81   """
82   return utils.SplitTime(time.time())
83
84
85 class _QueuedOpCode(object):
86   """Encapsulates an opcode object.
87
88   @ivar log: holds the execution log and consists of tuples
89   of the form C{(log_serial, timestamp, level, message)}
90   @ivar input: the OpCode we encapsulate
91   @ivar status: the current status
92   @ivar result: the result of the LU execution
93   @ivar start_timestamp: timestamp for the start of the execution
94   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95   @ivar stop_timestamp: timestamp for the end of the execution
96
97   """
98   __slots__ = ["input", "status", "result", "log", "priority",
99                "start_timestamp", "exec_timestamp", "end_timestamp",
100                "__weakref__"]
101
102   def __init__(self, op):
103     """Constructor for the _QuededOpCode.
104
105     @type op: L{opcodes.OpCode}
106     @param op: the opcode we encapsulate
107
108     """
109     self.input = op
110     self.status = constants.OP_STATUS_QUEUED
111     self.result = None
112     self.log = []
113     self.start_timestamp = None
114     self.exec_timestamp = None
115     self.end_timestamp = None
116
117     # Get initial priority (it might change during the lifetime of this opcode)
118     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119
120   @classmethod
121   def Restore(cls, state):
122     """Restore the _QueuedOpCode from the serialized form.
123
124     @type state: dict
125     @param state: the serialized state
126     @rtype: _QueuedOpCode
127     @return: a new _QueuedOpCode instance
128
129     """
130     obj = _QueuedOpCode.__new__(cls)
131     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
132     obj.status = state["status"]
133     obj.result = state["result"]
134     obj.log = state["log"]
135     obj.start_timestamp = state.get("start_timestamp", None)
136     obj.exec_timestamp = state.get("exec_timestamp", None)
137     obj.end_timestamp = state.get("end_timestamp", None)
138     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139     return obj
140
141   def Serialize(self):
142     """Serializes this _QueuedOpCode.
143
144     @rtype: dict
145     @return: the dictionary holding the serialized state
146
147     """
148     return {
149       "input": self.input.__getstate__(),
150       "status": self.status,
151       "result": self.result,
152       "log": self.log,
153       "start_timestamp": self.start_timestamp,
154       "exec_timestamp": self.exec_timestamp,
155       "end_timestamp": self.end_timestamp,
156       "priority": self.priority,
157       }
158
159
160 class _QueuedJob(object):
161   """In-memory job representation.
162
163   This is what we use to track the user-submitted jobs. Locking must
164   be taken care of by users of this class.
165
166   @type queue: L{JobQueue}
167   @ivar queue: the parent queue
168   @ivar id: the job ID
169   @type ops: list
170   @ivar ops: the list of _QueuedOpCode that constitute the job
171   @type log_serial: int
172   @ivar log_serial: holds the index for the next log entry
173   @ivar received_timestamp: the timestamp for when the job was received
174   @ivar start_timestmap: the timestamp for start of execution
175   @ivar end_timestamp: the timestamp for end of execution
176
177   """
178   # pylint: disable-msg=W0212
179   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
180                "received_timestamp", "start_timestamp", "end_timestamp",
181                "__weakref__"]
182
183   def __init__(self, queue, job_id, ops):
184     """Constructor for the _QueuedJob.
185
186     @type queue: L{JobQueue}
187     @param queue: our parent queue
188     @type job_id: job_id
189     @param job_id: our job id
190     @type ops: list
191     @param ops: the list of opcodes we hold, which will be encapsulated
192         in _QueuedOpCodes
193
194     """
195     if not ops:
196       raise errors.GenericError("A job needs at least one opcode")
197
198     self.queue = queue
199     self.id = job_id
200     self.ops = [_QueuedOpCode(op) for op in ops]
201     self.log_serial = 0
202     self.received_timestamp = TimeStampNow()
203     self.start_timestamp = None
204     self.end_timestamp = None
205
206     self._InitInMemory(self)
207
208   @staticmethod
209   def _InitInMemory(obj):
210     """Initializes in-memory variables.
211
212     """
213     obj.ops_iter = None
214     obj.cur_opctx = None
215
216   def __repr__(self):
217     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
218               "id=%s" % self.id,
219               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
220
221     return "<%s at %#x>" % (" ".join(status), id(self))
222
223   @classmethod
224   def Restore(cls, queue, state):
225     """Restore a _QueuedJob from serialized state:
226
227     @type queue: L{JobQueue}
228     @param queue: to which queue the restored job belongs
229     @type state: dict
230     @param state: the serialized state
231     @rtype: _JobQueue
232     @return: the restored _JobQueue instance
233
234     """
235     obj = _QueuedJob.__new__(cls)
236     obj.queue = queue
237     obj.id = state["id"]
238     obj.received_timestamp = state.get("received_timestamp", None)
239     obj.start_timestamp = state.get("start_timestamp", None)
240     obj.end_timestamp = state.get("end_timestamp", None)
241
242     obj.ops = []
243     obj.log_serial = 0
244     for op_state in state["ops"]:
245       op = _QueuedOpCode.Restore(op_state)
246       for log_entry in op.log:
247         obj.log_serial = max(obj.log_serial, log_entry[0])
248       obj.ops.append(op)
249
250     cls._InitInMemory(obj)
251
252     return obj
253
254   def Serialize(self):
255     """Serialize the _JobQueue instance.
256
257     @rtype: dict
258     @return: the serialized state
259
260     """
261     return {
262       "id": self.id,
263       "ops": [op.Serialize() for op in self.ops],
264       "start_timestamp": self.start_timestamp,
265       "end_timestamp": self.end_timestamp,
266       "received_timestamp": self.received_timestamp,
267       }
268
269   def CalcStatus(self):
270     """Compute the status of this job.
271
272     This function iterates over all the _QueuedOpCodes in the job and
273     based on their status, computes the job status.
274
275     The algorithm is:
276       - if we find a cancelled, or finished with error, the job
277         status will be the same
278       - otherwise, the last opcode with the status one of:
279           - waitlock
280           - canceling
281           - running
282
283         will determine the job status
284
285       - otherwise, it means either all opcodes are queued, or success,
286         and the job status will be the same
287
288     @return: the job status
289
290     """
291     status = constants.JOB_STATUS_QUEUED
292
293     all_success = True
294     for op in self.ops:
295       if op.status == constants.OP_STATUS_SUCCESS:
296         continue
297
298       all_success = False
299
300       if op.status == constants.OP_STATUS_QUEUED:
301         pass
302       elif op.status == constants.OP_STATUS_WAITLOCK:
303         status = constants.JOB_STATUS_WAITLOCK
304       elif op.status == constants.OP_STATUS_RUNNING:
305         status = constants.JOB_STATUS_RUNNING
306       elif op.status == constants.OP_STATUS_CANCELING:
307         status = constants.JOB_STATUS_CANCELING
308         break
309       elif op.status == constants.OP_STATUS_ERROR:
310         status = constants.JOB_STATUS_ERROR
311         # The whole job fails if one opcode failed
312         break
313       elif op.status == constants.OP_STATUS_CANCELED:
314         status = constants.OP_STATUS_CANCELED
315         break
316
317     if all_success:
318       status = constants.JOB_STATUS_SUCCESS
319
320     return status
321
322   def CalcPriority(self):
323     """Gets the current priority for this job.
324
325     Only unfinished opcodes are considered. When all are done, the default
326     priority is used.
327
328     @rtype: int
329
330     """
331     priorities = [op.priority for op in self.ops
332                   if op.status not in constants.OPS_FINALIZED]
333
334     if not priorities:
335       # All opcodes are done, assume default priority
336       return constants.OP_PRIO_DEFAULT
337
338     return min(priorities)
339
340   def GetLogEntries(self, newer_than):
341     """Selectively returns the log entries.
342
343     @type newer_than: None or int
344     @param newer_than: if this is None, return all log entries,
345         otherwise return only the log entries with serial higher
346         than this value
347     @rtype: list
348     @return: the list of the log entries selected
349
350     """
351     if newer_than is None:
352       serial = -1
353     else:
354       serial = newer_than
355
356     entries = []
357     for op in self.ops:
358       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
359
360     return entries
361
362   def GetInfo(self, fields):
363     """Returns information about a job.
364
365     @type fields: list
366     @param fields: names of fields to return
367     @rtype: list
368     @return: list with one element for each field
369     @raise errors.OpExecError: when an invalid field
370         has been passed
371
372     """
373     row = []
374     for fname in fields:
375       if fname == "id":
376         row.append(self.id)
377       elif fname == "status":
378         row.append(self.CalcStatus())
379       elif fname == "priority":
380         row.append(self.CalcPriority())
381       elif fname == "ops":
382         row.append([op.input.__getstate__() for op in self.ops])
383       elif fname == "opresult":
384         row.append([op.result for op in self.ops])
385       elif fname == "opstatus":
386         row.append([op.status for op in self.ops])
387       elif fname == "oplog":
388         row.append([op.log for op in self.ops])
389       elif fname == "opstart":
390         row.append([op.start_timestamp for op in self.ops])
391       elif fname == "opexec":
392         row.append([op.exec_timestamp for op in self.ops])
393       elif fname == "opend":
394         row.append([op.end_timestamp for op in self.ops])
395       elif fname == "oppriority":
396         row.append([op.priority for op in self.ops])
397       elif fname == "received_ts":
398         row.append(self.received_timestamp)
399       elif fname == "start_ts":
400         row.append(self.start_timestamp)
401       elif fname == "end_ts":
402         row.append(self.end_timestamp)
403       elif fname == "summary":
404         row.append([op.input.Summary() for op in self.ops])
405       else:
406         raise errors.OpExecError("Invalid self query field '%s'" % fname)
407     return row
408
409   def MarkUnfinishedOps(self, status, result):
410     """Mark unfinished opcodes with a given status and result.
411
412     This is an utility function for marking all running or waiting to
413     be run opcodes with a given status. Opcodes which are already
414     finalised are not changed.
415
416     @param status: a given opcode status
417     @param result: the opcode result
418
419     """
420     not_marked = True
421     for op in self.ops:
422       if op.status in constants.OPS_FINALIZED:
423         assert not_marked, "Finalized opcodes found after non-finalized ones"
424         continue
425       op.status = status
426       op.result = result
427       not_marked = False
428
429   def Finalize(self):
430     """Marks the job as finalized.
431
432     """
433     self.end_timestamp = TimeStampNow()
434
435   def Cancel(self):
436     """Marks job as canceled/-ing if possible.
437
438     @rtype: tuple; (bool, string)
439     @return: Boolean describing whether job was successfully canceled or marked
440       as canceling and a text message
441
442     """
443     status = self.CalcStatus()
444
445     if status == constants.JOB_STATUS_QUEUED:
446       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
447                              "Job canceled by request")
448       self.Finalize()
449       return (True, "Job %s canceled" % self.id)
450
451     elif status == constants.JOB_STATUS_WAITLOCK:
452       # The worker will notice the new status and cancel the job
453       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
454       return (True, "Job %s will be canceled" % self.id)
455
456     else:
457       logging.debug("Job %s is no longer waiting in the queue", self.id)
458       return (False, "Job %s is no longer waiting in the queue" % self.id)
459
460
461 class _OpExecCallbacks(mcpu.OpExecCbBase):
462   def __init__(self, queue, job, op):
463     """Initializes this class.
464
465     @type queue: L{JobQueue}
466     @param queue: Job queue
467     @type job: L{_QueuedJob}
468     @param job: Job object
469     @type op: L{_QueuedOpCode}
470     @param op: OpCode
471
472     """
473     assert queue, "Queue is missing"
474     assert job, "Job is missing"
475     assert op, "Opcode is missing"
476
477     self._queue = queue
478     self._job = job
479     self._op = op
480
481   def _CheckCancel(self):
482     """Raises an exception to cancel the job if asked to.
483
484     """
485     # Cancel here if we were asked to
486     if self._op.status == constants.OP_STATUS_CANCELING:
487       logging.debug("Canceling opcode")
488       raise CancelJob()
489
490   @locking.ssynchronized(_QUEUE, shared=1)
491   def NotifyStart(self):
492     """Mark the opcode as running, not lock-waiting.
493
494     This is called from the mcpu code as a notifier function, when the LU is
495     finally about to start the Exec() method. Of course, to have end-user
496     visible results, the opcode must be initially (before calling into
497     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
498
499     """
500     assert self._op in self._job.ops
501     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
502                                constants.OP_STATUS_CANCELING)
503
504     # Cancel here if we were asked to
505     self._CheckCancel()
506
507     logging.debug("Opcode is now running")
508
509     self._op.status = constants.OP_STATUS_RUNNING
510     self._op.exec_timestamp = TimeStampNow()
511
512     # And finally replicate the job status
513     self._queue.UpdateJobUnlocked(self._job)
514
515   @locking.ssynchronized(_QUEUE, shared=1)
516   def _AppendFeedback(self, timestamp, log_type, log_msg):
517     """Internal feedback append function, with locks
518
519     """
520     self._job.log_serial += 1
521     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
522     self._queue.UpdateJobUnlocked(self._job, replicate=False)
523
524   def Feedback(self, *args):
525     """Append a log entry.
526
527     """
528     assert len(args) < 3
529
530     if len(args) == 1:
531       log_type = constants.ELOG_MESSAGE
532       log_msg = args[0]
533     else:
534       (log_type, log_msg) = args
535
536     # The time is split to make serialization easier and not lose
537     # precision.
538     timestamp = utils.SplitTime(time.time())
539     self._AppendFeedback(timestamp, log_type, log_msg)
540
541   def CheckCancel(self):
542     """Check whether job has been cancelled.
543
544     """
545     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
546                                constants.OP_STATUS_CANCELING)
547
548     # Cancel here if we were asked to
549     self._CheckCancel()
550
551
552 class _JobChangesChecker(object):
553   def __init__(self, fields, prev_job_info, prev_log_serial):
554     """Initializes this class.
555
556     @type fields: list of strings
557     @param fields: Fields requested by LUXI client
558     @type prev_job_info: string
559     @param prev_job_info: previous job info, as passed by the LUXI client
560     @type prev_log_serial: string
561     @param prev_log_serial: previous job serial, as passed by the LUXI client
562
563     """
564     self._fields = fields
565     self._prev_job_info = prev_job_info
566     self._prev_log_serial = prev_log_serial
567
568   def __call__(self, job):
569     """Checks whether job has changed.
570
571     @type job: L{_QueuedJob}
572     @param job: Job object
573
574     """
575     status = job.CalcStatus()
576     job_info = job.GetInfo(self._fields)
577     log_entries = job.GetLogEntries(self._prev_log_serial)
578
579     # Serializing and deserializing data can cause type changes (e.g. from
580     # tuple to list) or precision loss. We're doing it here so that we get
581     # the same modifications as the data received from the client. Without
582     # this, the comparison afterwards might fail without the data being
583     # significantly different.
584     # TODO: we just deserialized from disk, investigate how to make sure that
585     # the job info and log entries are compatible to avoid this further step.
586     # TODO: Doing something like in testutils.py:UnifyValueType might be more
587     # efficient, though floats will be tricky
588     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
589     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
590
591     # Don't even try to wait if the job is no longer running, there will be
592     # no changes.
593     if (status not in (constants.JOB_STATUS_QUEUED,
594                        constants.JOB_STATUS_RUNNING,
595                        constants.JOB_STATUS_WAITLOCK) or
596         job_info != self._prev_job_info or
597         (log_entries and self._prev_log_serial != log_entries[0][0])):
598       logging.debug("Job %s changed", job.id)
599       return (job_info, log_entries)
600
601     return None
602
603
604 class _JobFileChangesWaiter(object):
605   def __init__(self, filename):
606     """Initializes this class.
607
608     @type filename: string
609     @param filename: Path to job file
610     @raises errors.InotifyError: if the notifier cannot be setup
611
612     """
613     self._wm = pyinotify.WatchManager()
614     self._inotify_handler = \
615       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
616     self._notifier = \
617       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
618     try:
619       self._inotify_handler.enable()
620     except Exception:
621       # pyinotify doesn't close file descriptors automatically
622       self._notifier.stop()
623       raise
624
625   def _OnInotify(self, notifier_enabled):
626     """Callback for inotify.
627
628     """
629     if not notifier_enabled:
630       self._inotify_handler.enable()
631
632   def Wait(self, timeout):
633     """Waits for the job file to change.
634
635     @type timeout: float
636     @param timeout: Timeout in seconds
637     @return: Whether there have been events
638
639     """
640     assert timeout >= 0
641     have_events = self._notifier.check_events(timeout * 1000)
642     if have_events:
643       self._notifier.read_events()
644     self._notifier.process_events()
645     return have_events
646
647   def Close(self):
648     """Closes underlying notifier and its file descriptor.
649
650     """
651     self._notifier.stop()
652
653
654 class _JobChangesWaiter(object):
655   def __init__(self, filename):
656     """Initializes this class.
657
658     @type filename: string
659     @param filename: Path to job file
660
661     """
662     self._filewaiter = None
663     self._filename = filename
664
665   def Wait(self, timeout):
666     """Waits for a job to change.
667
668     @type timeout: float
669     @param timeout: Timeout in seconds
670     @return: Whether there have been events
671
672     """
673     if self._filewaiter:
674       return self._filewaiter.Wait(timeout)
675
676     # Lazy setup: Avoid inotify setup cost when job file has already changed.
677     # If this point is reached, return immediately and let caller check the job
678     # file again in case there were changes since the last check. This avoids a
679     # race condition.
680     self._filewaiter = _JobFileChangesWaiter(self._filename)
681
682     return True
683
684   def Close(self):
685     """Closes underlying waiter.
686
687     """
688     if self._filewaiter:
689       self._filewaiter.Close()
690
691
692 class _WaitForJobChangesHelper(object):
693   """Helper class using inotify to wait for changes in a job file.
694
695   This class takes a previous job status and serial, and alerts the client when
696   the current job status has changed.
697
698   """
699   @staticmethod
700   def _CheckForChanges(job_load_fn, check_fn):
701     job = job_load_fn()
702     if not job:
703       raise errors.JobLost()
704
705     result = check_fn(job)
706     if result is None:
707       raise utils.RetryAgain()
708
709     return result
710
711   def __call__(self, filename, job_load_fn,
712                fields, prev_job_info, prev_log_serial, timeout):
713     """Waits for changes on a job.
714
715     @type filename: string
716     @param filename: File on which to wait for changes
717     @type job_load_fn: callable
718     @param job_load_fn: Function to load job
719     @type fields: list of strings
720     @param fields: Which fields to check for changes
721     @type prev_job_info: list or None
722     @param prev_job_info: Last job information returned
723     @type prev_log_serial: int
724     @param prev_log_serial: Last job message serial number
725     @type timeout: float
726     @param timeout: maximum time to wait in seconds
727
728     """
729     try:
730       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
731       waiter = _JobChangesWaiter(filename)
732       try:
733         return utils.Retry(compat.partial(self._CheckForChanges,
734                                           job_load_fn, check_fn),
735                            utils.RETRY_REMAINING_TIME, timeout,
736                            wait_fn=waiter.Wait)
737       finally:
738         waiter.Close()
739     except (errors.InotifyError, errors.JobLost):
740       return None
741     except utils.RetryTimeout:
742       return constants.JOB_NOTCHANGED
743
744
745 def _EncodeOpError(err):
746   """Encodes an error which occurred while processing an opcode.
747
748   """
749   if isinstance(err, errors.GenericError):
750     to_encode = err
751   else:
752     to_encode = errors.OpExecError(str(err))
753
754   return errors.EncodeException(to_encode)
755
756
757 class _TimeoutStrategyWrapper:
758   def __init__(self, fn):
759     """Initializes this class.
760
761     """
762     self._fn = fn
763     self._next = None
764
765   def _Advance(self):
766     """Gets the next timeout if necessary.
767
768     """
769     if self._next is None:
770       self._next = self._fn()
771
772   def Peek(self):
773     """Returns the next timeout.
774
775     """
776     self._Advance()
777     return self._next
778
779   def Next(self):
780     """Returns the current timeout and advances the internal state.
781
782     """
783     self._Advance()
784     result = self._next
785     self._next = None
786     return result
787
788
789 class _OpExecContext:
790   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
791     """Initializes this class.
792
793     """
794     self.op = op
795     self.index = index
796     self.log_prefix = log_prefix
797     self.summary = op.input.Summary()
798
799     self._timeout_strategy_factory = timeout_strategy_factory
800     self._ResetTimeoutStrategy()
801
802   def _ResetTimeoutStrategy(self):
803     """Creates a new timeout strategy.
804
805     """
806     self._timeout_strategy = \
807       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
808
809   def CheckPriorityIncrease(self):
810     """Checks whether priority can and should be increased.
811
812     Called when locks couldn't be acquired.
813
814     """
815     op = self.op
816
817     # Exhausted all retries and next round should not use blocking acquire
818     # for locks?
819     if (self._timeout_strategy.Peek() is None and
820         op.priority > constants.OP_PRIO_HIGHEST):
821       logging.debug("Increasing priority")
822       op.priority -= 1
823       self._ResetTimeoutStrategy()
824       return True
825
826     return False
827
828   def GetNextLockTimeout(self):
829     """Returns the next lock acquire timeout.
830
831     """
832     return self._timeout_strategy.Next()
833
834
835 class _JobProcessor(object):
836   def __init__(self, queue, opexec_fn, job,
837                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
838     """Initializes this class.
839
840     """
841     self.queue = queue
842     self.opexec_fn = opexec_fn
843     self.job = job
844     self._timeout_strategy_factory = _timeout_strategy_factory
845
846   @staticmethod
847   def _FindNextOpcode(job, timeout_strategy_factory):
848     """Locates the next opcode to run.
849
850     @type job: L{_QueuedJob}
851     @param job: Job object
852     @param timeout_strategy_factory: Callable to create new timeout strategy
853
854     """
855     # Create some sort of a cache to speed up locating next opcode for future
856     # lookups
857     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
858     # pending and one for processed ops.
859     if job.ops_iter is None:
860       job.ops_iter = enumerate(job.ops)
861
862     # Find next opcode to run
863     while True:
864       try:
865         (idx, op) = job.ops_iter.next()
866       except StopIteration:
867         raise errors.ProgrammerError("Called for a finished job")
868
869       if op.status == constants.OP_STATUS_RUNNING:
870         # Found an opcode already marked as running
871         raise errors.ProgrammerError("Called for job marked as running")
872
873       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
874                              timeout_strategy_factory)
875
876       if op.status not in constants.OPS_FINALIZED:
877         return opctx
878
879       # This is a job that was partially completed before master daemon
880       # shutdown, so it can be expected that some opcodes are already
881       # completed successfully (if any did error out, then the whole job
882       # should have been aborted and not resubmitted for processing).
883       logging.info("%s: opcode %s already processed, skipping",
884                    opctx.log_prefix, opctx.summary)
885
886   @staticmethod
887   def _MarkWaitlock(job, op):
888     """Marks an opcode as waiting for locks.
889
890     The job's start timestamp is also set if necessary.
891
892     @type job: L{_QueuedJob}
893     @param job: Job object
894     @type op: L{_QueuedOpCode}
895     @param op: Opcode object
896
897     """
898     assert op in job.ops
899     assert op.status in (constants.OP_STATUS_QUEUED,
900                          constants.OP_STATUS_WAITLOCK)
901
902     update = False
903
904     op.result = None
905
906     if op.status == constants.OP_STATUS_QUEUED:
907       op.status = constants.OP_STATUS_WAITLOCK
908       update = True
909
910     if op.start_timestamp is None:
911       op.start_timestamp = TimeStampNow()
912       update = True
913
914     if job.start_timestamp is None:
915       job.start_timestamp = op.start_timestamp
916       update = True
917
918     assert op.status == constants.OP_STATUS_WAITLOCK
919
920     return update
921
922   def _ExecOpCodeUnlocked(self, opctx):
923     """Processes one opcode and returns the result.
924
925     """
926     op = opctx.op
927
928     assert op.status == constants.OP_STATUS_WAITLOCK
929
930     timeout = opctx.GetNextLockTimeout()
931
932     try:
933       # Make sure not to hold queue lock while calling ExecOpCode
934       result = self.opexec_fn(op.input,
935                               _OpExecCallbacks(self.queue, self.job, op),
936                               timeout=timeout, priority=op.priority)
937     except mcpu.LockAcquireTimeout:
938       assert timeout is not None, "Received timeout for blocking acquire"
939       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
940
941       assert op.status in (constants.OP_STATUS_WAITLOCK,
942                            constants.OP_STATUS_CANCELING)
943
944       # Was job cancelled while we were waiting for the lock?
945       if op.status == constants.OP_STATUS_CANCELING:
946         return (constants.OP_STATUS_CANCELING, None)
947
948       # Stay in waitlock while trying to re-acquire lock
949       return (constants.OP_STATUS_WAITLOCK, None)
950     except CancelJob:
951       logging.exception("%s: Canceling job", opctx.log_prefix)
952       assert op.status == constants.OP_STATUS_CANCELING
953       return (constants.OP_STATUS_CANCELING, None)
954     except Exception, err: # pylint: disable-msg=W0703
955       logging.exception("%s: Caught exception in %s",
956                         opctx.log_prefix, opctx.summary)
957       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
958     else:
959       logging.debug("%s: %s successful",
960                     opctx.log_prefix, opctx.summary)
961       return (constants.OP_STATUS_SUCCESS, result)
962
963   def __call__(self, _nextop_fn=None):
964     """Continues execution of a job.
965
966     @param _nextop_fn: Callback function for tests
967     @rtype: bool
968     @return: True if job is finished, False if processor needs to be called
969              again
970
971     """
972     queue = self.queue
973     job = self.job
974
975     logging.debug("Processing job %s", job.id)
976
977     queue.acquire(shared=1)
978     try:
979       opcount = len(job.ops)
980
981       # Don't do anything for finalized jobs
982       if job.CalcStatus() in constants.JOBS_FINALIZED:
983         return True
984
985       # Is a previous opcode still pending?
986       if job.cur_opctx:
987         opctx = job.cur_opctx
988         job.cur_opctx = None
989       else:
990         if __debug__ and _nextop_fn:
991           _nextop_fn()
992         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
993
994       op = opctx.op
995
996       # Consistency check
997       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
998                                      constants.OP_STATUS_CANCELING)
999                         for i in job.ops[opctx.index + 1:])
1000
1001       assert op.status in (constants.OP_STATUS_QUEUED,
1002                            constants.OP_STATUS_WAITLOCK,
1003                            constants.OP_STATUS_CANCELING)
1004
1005       assert (op.priority <= constants.OP_PRIO_LOWEST and
1006               op.priority >= constants.OP_PRIO_HIGHEST)
1007
1008       if op.status != constants.OP_STATUS_CANCELING:
1009         assert op.status in (constants.OP_STATUS_QUEUED,
1010                              constants.OP_STATUS_WAITLOCK)
1011
1012         # Prepare to start opcode
1013         if self._MarkWaitlock(job, op):
1014           # Write to disk
1015           queue.UpdateJobUnlocked(job)
1016
1017         assert op.status == constants.OP_STATUS_WAITLOCK
1018         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1019         assert job.start_timestamp and op.start_timestamp
1020
1021         logging.info("%s: opcode %s waiting for locks",
1022                      opctx.log_prefix, opctx.summary)
1023
1024         queue.release()
1025         try:
1026           (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1027         finally:
1028           queue.acquire(shared=1)
1029
1030         op.status = op_status
1031         op.result = op_result
1032
1033         if op.status == constants.OP_STATUS_WAITLOCK:
1034           # Couldn't get locks in time
1035           assert not op.end_timestamp
1036         else:
1037           # Finalize opcode
1038           op.end_timestamp = TimeStampNow()
1039
1040           if op.status == constants.OP_STATUS_CANCELING:
1041             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1042                                   for i in job.ops[opctx.index:])
1043           else:
1044             assert op.status in constants.OPS_FINALIZED
1045
1046       if op.status == constants.OP_STATUS_WAITLOCK:
1047         finalize = False
1048
1049         if opctx.CheckPriorityIncrease():
1050           # Priority was changed, need to update on-disk file
1051           queue.UpdateJobUnlocked(job)
1052
1053         # Keep around for another round
1054         job.cur_opctx = opctx
1055
1056         assert (op.priority <= constants.OP_PRIO_LOWEST and
1057                 op.priority >= constants.OP_PRIO_HIGHEST)
1058
1059         # In no case must the status be finalized here
1060         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1061
1062       else:
1063         # Ensure all opcodes so far have been successful
1064         assert (opctx.index == 0 or
1065                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1066                            for i in job.ops[:opctx.index]))
1067
1068         # Reset context
1069         job.cur_opctx = None
1070
1071         if op.status == constants.OP_STATUS_SUCCESS:
1072           finalize = False
1073
1074         elif op.status == constants.OP_STATUS_ERROR:
1075           # Ensure failed opcode has an exception as its result
1076           assert errors.GetEncodedError(job.ops[opctx.index].result)
1077
1078           to_encode = errors.OpExecError("Preceding opcode failed")
1079           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1080                                 _EncodeOpError(to_encode))
1081           finalize = True
1082
1083           # Consistency check
1084           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1085                             errors.GetEncodedError(i.result)
1086                             for i in job.ops[opctx.index:])
1087
1088         elif op.status == constants.OP_STATUS_CANCELING:
1089           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1090                                 "Job canceled by request")
1091           finalize = True
1092
1093         else:
1094           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1095
1096         if opctx.index == (opcount - 1):
1097           # Finalize on last opcode
1098           finalize = True
1099
1100         if finalize:
1101           # All opcodes have been run, finalize job
1102           job.Finalize()
1103
1104         # Write to disk. If the job status is final, this is the final write
1105         # allowed. Once the file has been written, it can be archived anytime.
1106         queue.UpdateJobUnlocked(job)
1107
1108         if finalize:
1109           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1110           return True
1111
1112       return False
1113     finally:
1114       queue.release()
1115
1116
1117 class _JobQueueWorker(workerpool.BaseWorker):
1118   """The actual job workers.
1119
1120   """
1121   def RunTask(self, job): # pylint: disable-msg=W0221
1122     """Job executor.
1123
1124     This functions processes a job. It is closely tied to the L{_QueuedJob} and
1125     L{_QueuedOpCode} classes.
1126
1127     @type job: L{_QueuedJob}
1128     @param job: the job to be processed
1129
1130     """
1131     queue = job.queue
1132     assert queue == self.pool.queue
1133
1134     self.SetTaskName("Job%s" % job.id)
1135
1136     proc = mcpu.Processor(queue.context, job.id)
1137
1138     if not _JobProcessor(queue, proc.ExecOpCode, job)():
1139       # Schedule again
1140       raise workerpool.DeferTask(priority=job.CalcPriority())
1141
1142
1143 class _JobQueueWorkerPool(workerpool.WorkerPool):
1144   """Simple class implementing a job-processing workerpool.
1145
1146   """
1147   def __init__(self, queue):
1148     super(_JobQueueWorkerPool, self).__init__("JobQueue",
1149                                               JOBQUEUE_THREADS,
1150                                               _JobQueueWorker)
1151     self.queue = queue
1152
1153
1154 def _RequireOpenQueue(fn):
1155   """Decorator for "public" functions.
1156
1157   This function should be used for all 'public' functions. That is,
1158   functions usually called from other classes. Note that this should
1159   be applied only to methods (not plain functions), since it expects
1160   that the decorated function is called with a first argument that has
1161   a '_queue_filelock' argument.
1162
1163   @warning: Use this decorator only after locking.ssynchronized
1164
1165   Example::
1166     @locking.ssynchronized(_LOCK)
1167     @_RequireOpenQueue
1168     def Example(self):
1169       pass
1170
1171   """
1172   def wrapper(self, *args, **kwargs):
1173     # pylint: disable-msg=W0212
1174     assert self._queue_filelock is not None, "Queue should be open"
1175     return fn(self, *args, **kwargs)
1176   return wrapper
1177
1178
1179 class JobQueue(object):
1180   """Queue used to manage the jobs.
1181
1182   @cvar _RE_JOB_FILE: regex matching the valid job file names
1183
1184   """
1185   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1186
1187   def __init__(self, context):
1188     """Constructor for JobQueue.
1189
1190     The constructor will initialize the job queue object and then
1191     start loading the current jobs from disk, either for starting them
1192     (if they were queue) or for aborting them (if they were already
1193     running).
1194
1195     @type context: GanetiContext
1196     @param context: the context object for access to the configuration
1197         data and other ganeti objects
1198
1199     """
1200     self.context = context
1201     self._memcache = weakref.WeakValueDictionary()
1202     self._my_hostname = netutils.Hostname.GetSysName()
1203
1204     # The Big JobQueue lock. If a code block or method acquires it in shared
1205     # mode safe it must guarantee concurrency with all the code acquiring it in
1206     # shared mode, including itself. In order not to acquire it at all
1207     # concurrency must be guaranteed with all code acquiring it in shared mode
1208     # and all code acquiring it exclusively.
1209     self._lock = locking.SharedLock("JobQueue")
1210
1211     self.acquire = self._lock.acquire
1212     self.release = self._lock.release
1213
1214     # Initialize the queue, and acquire the filelock.
1215     # This ensures no other process is working on the job queue.
1216     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1217
1218     # Read serial file
1219     self._last_serial = jstore.ReadSerial()
1220     assert self._last_serial is not None, ("Serial file was modified between"
1221                                            " check in jstore and here")
1222
1223     # Get initial list of nodes
1224     self._nodes = dict((n.name, n.primary_ip)
1225                        for n in self.context.cfg.GetAllNodesInfo().values()
1226                        if n.master_candidate)
1227
1228     # Remove master node
1229     self._nodes.pop(self._my_hostname, None)
1230
1231     # TODO: Check consistency across nodes
1232
1233     self._queue_size = 0
1234     self._UpdateQueueSizeUnlocked()
1235     self._drained = self._IsQueueMarkedDrain()
1236
1237     # Setup worker pool
1238     self._wpool = _JobQueueWorkerPool(self)
1239     try:
1240       self._InspectQueue()
1241     except:
1242       self._wpool.TerminateWorkers()
1243       raise
1244
1245   @locking.ssynchronized(_LOCK)
1246   @_RequireOpenQueue
1247   def _InspectQueue(self):
1248     """Loads the whole job queue and resumes unfinished jobs.
1249
1250     This function needs the lock here because WorkerPool.AddTask() may start a
1251     job while we're still doing our work.
1252
1253     """
1254     logging.info("Inspecting job queue")
1255
1256     restartjobs = []
1257
1258     all_job_ids = self._GetJobIDsUnlocked()
1259     jobs_count = len(all_job_ids)
1260     lastinfo = time.time()
1261     for idx, job_id in enumerate(all_job_ids):
1262       # Give an update every 1000 jobs or 10 seconds
1263       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1264           idx == (jobs_count - 1)):
1265         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1266                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1267         lastinfo = time.time()
1268
1269       job = self._LoadJobUnlocked(job_id)
1270
1271       # a failure in loading the job can cause 'None' to be returned
1272       if job is None:
1273         continue
1274
1275       status = job.CalcStatus()
1276
1277       if status == constants.JOB_STATUS_QUEUED:
1278         restartjobs.append(job)
1279
1280       elif status in (constants.JOB_STATUS_RUNNING,
1281                       constants.JOB_STATUS_WAITLOCK,
1282                       constants.JOB_STATUS_CANCELING):
1283         logging.warning("Unfinished job %s found: %s", job.id, job)
1284
1285         if status == constants.JOB_STATUS_WAITLOCK:
1286           # Restart job
1287           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1288           restartjobs.append(job)
1289         else:
1290           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1291                                 "Unclean master daemon shutdown")
1292
1293         self.UpdateJobUnlocked(job)
1294
1295     if restartjobs:
1296       logging.info("Restarting %s jobs", len(restartjobs))
1297       self._EnqueueJobs(restartjobs)
1298
1299     logging.info("Job queue inspection finished")
1300
1301   @locking.ssynchronized(_LOCK)
1302   @_RequireOpenQueue
1303   def AddNode(self, node):
1304     """Register a new node with the queue.
1305
1306     @type node: L{objects.Node}
1307     @param node: the node object to be added
1308
1309     """
1310     node_name = node.name
1311     assert node_name != self._my_hostname
1312
1313     # Clean queue directory on added node
1314     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1315     msg = result.fail_msg
1316     if msg:
1317       logging.warning("Cannot cleanup queue directory on node %s: %s",
1318                       node_name, msg)
1319
1320     if not node.master_candidate:
1321       # remove if existing, ignoring errors
1322       self._nodes.pop(node_name, None)
1323       # and skip the replication of the job ids
1324       return
1325
1326     # Upload the whole queue excluding archived jobs
1327     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1328
1329     # Upload current serial file
1330     files.append(constants.JOB_QUEUE_SERIAL_FILE)
1331
1332     for file_name in files:
1333       # Read file content
1334       content = utils.ReadFile(file_name)
1335
1336       result = rpc.RpcRunner.call_jobqueue_update([node_name],
1337                                                   [node.primary_ip],
1338                                                   file_name, content)
1339       msg = result[node_name].fail_msg
1340       if msg:
1341         logging.error("Failed to upload file %s to node %s: %s",
1342                       file_name, node_name, msg)
1343
1344     self._nodes[node_name] = node.primary_ip
1345
1346   @locking.ssynchronized(_LOCK)
1347   @_RequireOpenQueue
1348   def RemoveNode(self, node_name):
1349     """Callback called when removing nodes from the cluster.
1350
1351     @type node_name: str
1352     @param node_name: the name of the node to remove
1353
1354     """
1355     self._nodes.pop(node_name, None)
1356
1357   @staticmethod
1358   def _CheckRpcResult(result, nodes, failmsg):
1359     """Verifies the status of an RPC call.
1360
1361     Since we aim to keep consistency should this node (the current
1362     master) fail, we will log errors if our rpc fail, and especially
1363     log the case when more than half of the nodes fails.
1364
1365     @param result: the data as returned from the rpc call
1366     @type nodes: list
1367     @param nodes: the list of nodes we made the call to
1368     @type failmsg: str
1369     @param failmsg: the identifier to be used for logging
1370
1371     """
1372     failed = []
1373     success = []
1374
1375     for node in nodes:
1376       msg = result[node].fail_msg
1377       if msg:
1378         failed.append(node)
1379         logging.error("RPC call %s (%s) failed on node %s: %s",
1380                       result[node].call, failmsg, node, msg)
1381       else:
1382         success.append(node)
1383
1384     # +1 for the master node
1385     if (len(success) + 1) < len(failed):
1386       # TODO: Handle failing nodes
1387       logging.error("More than half of the nodes failed")
1388
1389   def _GetNodeIp(self):
1390     """Helper for returning the node name/ip list.
1391
1392     @rtype: (list, list)
1393     @return: a tuple of two lists, the first one with the node
1394         names and the second one with the node addresses
1395
1396     """
1397     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1398     name_list = self._nodes.keys()
1399     addr_list = [self._nodes[name] for name in name_list]
1400     return name_list, addr_list
1401
1402   def _UpdateJobQueueFile(self, file_name, data, replicate):
1403     """Writes a file locally and then replicates it to all nodes.
1404
1405     This function will replace the contents of a file on the local
1406     node and then replicate it to all the other nodes we have.
1407
1408     @type file_name: str
1409     @param file_name: the path of the file to be replicated
1410     @type data: str
1411     @param data: the new contents of the file
1412     @type replicate: boolean
1413     @param replicate: whether to spread the changes to the remote nodes
1414
1415     """
1416     getents = runtime.GetEnts()
1417     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1418                     gid=getents.masterd_gid)
1419
1420     if replicate:
1421       names, addrs = self._GetNodeIp()
1422       result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1423       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1424
1425   def _RenameFilesUnlocked(self, rename):
1426     """Renames a file locally and then replicate the change.
1427
1428     This function will rename a file in the local queue directory
1429     and then replicate this rename to all the other nodes we have.
1430
1431     @type rename: list of (old, new)
1432     @param rename: List containing tuples mapping old to new names
1433
1434     """
1435     # Rename them locally
1436     for old, new in rename:
1437       utils.RenameFile(old, new, mkdir=True)
1438
1439     # ... and on all nodes
1440     names, addrs = self._GetNodeIp()
1441     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1442     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1443
1444   @staticmethod
1445   def _FormatJobID(job_id):
1446     """Convert a job ID to string format.
1447
1448     Currently this just does C{str(job_id)} after performing some
1449     checks, but if we want to change the job id format this will
1450     abstract this change.
1451
1452     @type job_id: int or long
1453     @param job_id: the numeric job id
1454     @rtype: str
1455     @return: the formatted job id
1456
1457     """
1458     if not isinstance(job_id, (int, long)):
1459       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1460     if job_id < 0:
1461       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1462
1463     return str(job_id)
1464
1465   @classmethod
1466   def _GetArchiveDirectory(cls, job_id):
1467     """Returns the archive directory for a job.
1468
1469     @type job_id: str
1470     @param job_id: Job identifier
1471     @rtype: str
1472     @return: Directory name
1473
1474     """
1475     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1476
1477   def _NewSerialsUnlocked(self, count):
1478     """Generates a new job identifier.
1479
1480     Job identifiers are unique during the lifetime of a cluster.
1481
1482     @type count: integer
1483     @param count: how many serials to return
1484     @rtype: str
1485     @return: a string representing the job identifier.
1486
1487     """
1488     assert count > 0
1489     # New number
1490     serial = self._last_serial + count
1491
1492     # Write to file
1493     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1494                              "%s\n" % serial, True)
1495
1496     result = [self._FormatJobID(v)
1497               for v in range(self._last_serial, serial + 1)]
1498     # Keep it only if we were able to write the file
1499     self._last_serial = serial
1500
1501     return result
1502
1503   @staticmethod
1504   def _GetJobPath(job_id):
1505     """Returns the job file for a given job id.
1506
1507     @type job_id: str
1508     @param job_id: the job identifier
1509     @rtype: str
1510     @return: the path to the job file
1511
1512     """
1513     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1514
1515   @classmethod
1516   def _GetArchivedJobPath(cls, job_id):
1517     """Returns the archived job file for a give job id.
1518
1519     @type job_id: str
1520     @param job_id: the job identifier
1521     @rtype: str
1522     @return: the path to the archived job file
1523
1524     """
1525     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1526                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1527
1528   def _GetJobIDsUnlocked(self, sort=True):
1529     """Return all known job IDs.
1530
1531     The method only looks at disk because it's a requirement that all
1532     jobs are present on disk (so in the _memcache we don't have any
1533     extra IDs).
1534
1535     @type sort: boolean
1536     @param sort: perform sorting on the returned job ids
1537     @rtype: list
1538     @return: the list of job IDs
1539
1540     """
1541     jlist = []
1542     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1543       m = self._RE_JOB_FILE.match(filename)
1544       if m:
1545         jlist.append(m.group(1))
1546     if sort:
1547       jlist = utils.NiceSort(jlist)
1548     return jlist
1549
1550   def _LoadJobUnlocked(self, job_id):
1551     """Loads a job from the disk or memory.
1552
1553     Given a job id, this will return the cached job object if
1554     existing, or try to load the job from the disk. If loading from
1555     disk, it will also add the job to the cache.
1556
1557     @param job_id: the job id
1558     @rtype: L{_QueuedJob} or None
1559     @return: either None or the job object
1560
1561     """
1562     job = self._memcache.get(job_id, None)
1563     if job:
1564       logging.debug("Found job %s in memcache", job_id)
1565       return job
1566
1567     try:
1568       job = self._LoadJobFromDisk(job_id)
1569       if job is None:
1570         return job
1571     except errors.JobFileCorrupted:
1572       old_path = self._GetJobPath(job_id)
1573       new_path = self._GetArchivedJobPath(job_id)
1574       if old_path == new_path:
1575         # job already archived (future case)
1576         logging.exception("Can't parse job %s", job_id)
1577       else:
1578         # non-archived case
1579         logging.exception("Can't parse job %s, will archive.", job_id)
1580         self._RenameFilesUnlocked([(old_path, new_path)])
1581       return None
1582
1583     self._memcache[job_id] = job
1584     logging.debug("Added job %s to the cache", job_id)
1585     return job
1586
1587   def _LoadJobFromDisk(self, job_id):
1588     """Load the given job file from disk.
1589
1590     Given a job file, read, load and restore it in a _QueuedJob format.
1591
1592     @type job_id: string
1593     @param job_id: job identifier
1594     @rtype: L{_QueuedJob} or None
1595     @return: either None or the job object
1596
1597     """
1598     filepath = self._GetJobPath(job_id)
1599     logging.debug("Loading job from %s", filepath)
1600     try:
1601       raw_data = utils.ReadFile(filepath)
1602     except EnvironmentError, err:
1603       if err.errno in (errno.ENOENT, ):
1604         return None
1605       raise
1606
1607     try:
1608       data = serializer.LoadJson(raw_data)
1609       job = _QueuedJob.Restore(self, data)
1610     except Exception, err: # pylint: disable-msg=W0703
1611       raise errors.JobFileCorrupted(err)
1612
1613     return job
1614
1615   def SafeLoadJobFromDisk(self, job_id):
1616     """Load the given job file from disk.
1617
1618     Given a job file, read, load and restore it in a _QueuedJob format.
1619     In case of error reading the job, it gets returned as None, and the
1620     exception is logged.
1621
1622     @type job_id: string
1623     @param job_id: job identifier
1624     @rtype: L{_QueuedJob} or None
1625     @return: either None or the job object
1626
1627     """
1628     try:
1629       return self._LoadJobFromDisk(job_id)
1630     except (errors.JobFileCorrupted, EnvironmentError):
1631       logging.exception("Can't load/parse job %s", job_id)
1632       return None
1633
1634   @staticmethod
1635   def _IsQueueMarkedDrain():
1636     """Check if the queue is marked from drain.
1637
1638     This currently uses the queue drain file, which makes it a
1639     per-node flag. In the future this can be moved to the config file.
1640
1641     @rtype: boolean
1642     @return: True of the job queue is marked for draining
1643
1644     """
1645     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1646
1647   def _UpdateQueueSizeUnlocked(self):
1648     """Update the queue size.
1649
1650     """
1651     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1652
1653   @locking.ssynchronized(_LOCK)
1654   @_RequireOpenQueue
1655   def SetDrainFlag(self, drain_flag):
1656     """Sets the drain flag for the queue.
1657
1658     @type drain_flag: boolean
1659     @param drain_flag: Whether to set or unset the drain flag
1660
1661     """
1662     getents = runtime.GetEnts()
1663
1664     if drain_flag:
1665       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True,
1666                       uid=getents.masterd_uid, gid=getents.masterd_gid)
1667     else:
1668       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1669
1670     self._drained = drain_flag
1671
1672     return True
1673
1674   @_RequireOpenQueue
1675   def _SubmitJobUnlocked(self, job_id, ops):
1676     """Create and store a new job.
1677
1678     This enters the job into our job queue and also puts it on the new
1679     queue, in order for it to be picked up by the queue processors.
1680
1681     @type job_id: job ID
1682     @param job_id: the job ID for the new job
1683     @type ops: list
1684     @param ops: The list of OpCodes that will become the new job.
1685     @rtype: L{_QueuedJob}
1686     @return: the job object to be queued
1687     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1688     @raise errors.JobQueueFull: if the job queue has too many jobs in it
1689     @raise errors.GenericError: If an opcode is not valid
1690
1691     """
1692     # Ok when sharing the big job queue lock, as the drain file is created when
1693     # the lock is exclusive.
1694     if self._drained:
1695       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1696
1697     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1698       raise errors.JobQueueFull()
1699
1700     job = _QueuedJob(self, job_id, ops)
1701
1702     # Check priority
1703     for idx, op in enumerate(job.ops):
1704       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1705         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1706         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1707                                   " are %s" % (idx, op.priority, allowed))
1708
1709     # Write to disk
1710     self.UpdateJobUnlocked(job)
1711
1712     self._queue_size += 1
1713
1714     logging.debug("Adding new job %s to the cache", job_id)
1715     self._memcache[job_id] = job
1716
1717     return job
1718
1719   @locking.ssynchronized(_LOCK)
1720   @_RequireOpenQueue
1721   def SubmitJob(self, ops):
1722     """Create and store a new job.
1723
1724     @see: L{_SubmitJobUnlocked}
1725
1726     """
1727     job_id = self._NewSerialsUnlocked(1)[0]
1728     self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1729     return job_id
1730
1731   @locking.ssynchronized(_LOCK)
1732   @_RequireOpenQueue
1733   def SubmitManyJobs(self, jobs):
1734     """Create and store multiple jobs.
1735
1736     @see: L{_SubmitJobUnlocked}
1737
1738     """
1739     results = []
1740     added_jobs = []
1741     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1742     for job_id, ops in zip(all_job_ids, jobs):
1743       try:
1744         added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1745         status = True
1746         data = job_id
1747       except errors.GenericError, err:
1748         data = str(err)
1749         status = False
1750       results.append((status, data))
1751
1752     self._EnqueueJobs(added_jobs)
1753
1754     return results
1755
1756   def _EnqueueJobs(self, jobs):
1757     """Helper function to add jobs to worker pool's queue.
1758
1759     @type jobs: list
1760     @param jobs: List of all jobs
1761
1762     """
1763     self._wpool.AddManyTasks([(job, ) for job in jobs],
1764                              priority=[job.CalcPriority() for job in jobs])
1765
1766   @_RequireOpenQueue
1767   def UpdateJobUnlocked(self, job, replicate=True):
1768     """Update a job's on disk storage.
1769
1770     After a job has been modified, this function needs to be called in
1771     order to write the changes to disk and replicate them to the other
1772     nodes.
1773
1774     @type job: L{_QueuedJob}
1775     @param job: the changed job
1776     @type replicate: boolean
1777     @param replicate: whether to replicate the change to remote nodes
1778
1779     """
1780     if __debug__:
1781       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1782       assert (finalized ^ (job.end_timestamp is None))
1783
1784     filename = self._GetJobPath(job.id)
1785     data = serializer.DumpJson(job.Serialize(), indent=False)
1786     logging.debug("Writing job %s to %s", job.id, filename)
1787     self._UpdateJobQueueFile(filename, data, replicate)
1788
1789   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1790                         timeout):
1791     """Waits for changes in a job.
1792
1793     @type job_id: string
1794     @param job_id: Job identifier
1795     @type fields: list of strings
1796     @param fields: Which fields to check for changes
1797     @type prev_job_info: list or None
1798     @param prev_job_info: Last job information returned
1799     @type prev_log_serial: int
1800     @param prev_log_serial: Last job message serial number
1801     @type timeout: float
1802     @param timeout: maximum time to wait in seconds
1803     @rtype: tuple (job info, log entries)
1804     @return: a tuple of the job information as required via
1805         the fields parameter, and the log entries as a list
1806
1807         if the job has not changed and the timeout has expired,
1808         we instead return a special value,
1809         L{constants.JOB_NOTCHANGED}, which should be interpreted
1810         as such by the clients
1811
1812     """
1813     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
1814
1815     helper = _WaitForJobChangesHelper()
1816
1817     return helper(self._GetJobPath(job_id), load_fn,
1818                   fields, prev_job_info, prev_log_serial, timeout)
1819
1820   @locking.ssynchronized(_LOCK)
1821   @_RequireOpenQueue
1822   def CancelJob(self, job_id):
1823     """Cancels a job.
1824
1825     This will only succeed if the job has not started yet.
1826
1827     @type job_id: string
1828     @param job_id: job ID of job to be cancelled.
1829
1830     """
1831     logging.info("Cancelling job %s", job_id)
1832
1833     job = self._LoadJobUnlocked(job_id)
1834     if not job:
1835       logging.debug("Job %s not found", job_id)
1836       return (False, "Job %s not found" % job_id)
1837
1838     (success, msg) = job.Cancel()
1839
1840     if success:
1841       # If the job was finalized (e.g. cancelled), this is the final write
1842       # allowed. The job can be archived anytime.
1843       self.UpdateJobUnlocked(job)
1844
1845     return (success, msg)
1846
1847   @_RequireOpenQueue
1848   def _ArchiveJobsUnlocked(self, jobs):
1849     """Archives jobs.
1850
1851     @type jobs: list of L{_QueuedJob}
1852     @param jobs: Job objects
1853     @rtype: int
1854     @return: Number of archived jobs
1855
1856     """
1857     archive_jobs = []
1858     rename_files = []
1859     for job in jobs:
1860       if job.CalcStatus() not in constants.JOBS_FINALIZED:
1861         logging.debug("Job %s is not yet done", job.id)
1862         continue
1863
1864       archive_jobs.append(job)
1865
1866       old = self._GetJobPath(job.id)
1867       new = self._GetArchivedJobPath(job.id)
1868       rename_files.append((old, new))
1869
1870     # TODO: What if 1..n files fail to rename?
1871     self._RenameFilesUnlocked(rename_files)
1872
1873     logging.debug("Successfully archived job(s) %s",
1874                   utils.CommaJoin(job.id for job in archive_jobs))
1875
1876     # Since we haven't quite checked, above, if we succeeded or failed renaming
1877     # the files, we update the cached queue size from the filesystem. When we
1878     # get around to fix the TODO: above, we can use the number of actually
1879     # archived jobs to fix this.
1880     self._UpdateQueueSizeUnlocked()
1881     return len(archive_jobs)
1882
1883   @locking.ssynchronized(_LOCK)
1884   @_RequireOpenQueue
1885   def ArchiveJob(self, job_id):
1886     """Archives a job.
1887
1888     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1889
1890     @type job_id: string
1891     @param job_id: Job ID of job to be archived.
1892     @rtype: bool
1893     @return: Whether job was archived
1894
1895     """
1896     logging.info("Archiving job %s", job_id)
1897
1898     job = self._LoadJobUnlocked(job_id)
1899     if not job:
1900       logging.debug("Job %s not found", job_id)
1901       return False
1902
1903     return self._ArchiveJobsUnlocked([job]) == 1
1904
1905   @locking.ssynchronized(_LOCK)
1906   @_RequireOpenQueue
1907   def AutoArchiveJobs(self, age, timeout):
1908     """Archives all jobs based on age.
1909
1910     The method will archive all jobs which are older than the age
1911     parameter. For jobs that don't have an end timestamp, the start
1912     timestamp will be considered. The special '-1' age will cause
1913     archival of all jobs (that are not running or queued).
1914
1915     @type age: int
1916     @param age: the minimum age in seconds
1917
1918     """
1919     logging.info("Archiving jobs with age more than %s seconds", age)
1920
1921     now = time.time()
1922     end_time = now + timeout
1923     archived_count = 0
1924     last_touched = 0
1925
1926     all_job_ids = self._GetJobIDsUnlocked()
1927     pending = []
1928     for idx, job_id in enumerate(all_job_ids):
1929       last_touched = idx + 1
1930
1931       # Not optimal because jobs could be pending
1932       # TODO: Measure average duration for job archival and take number of
1933       # pending jobs into account.
1934       if time.time() > end_time:
1935         break
1936
1937       # Returns None if the job failed to load
1938       job = self._LoadJobUnlocked(job_id)
1939       if job:
1940         if job.end_timestamp is None:
1941           if job.start_timestamp is None:
1942             job_age = job.received_timestamp
1943           else:
1944             job_age = job.start_timestamp
1945         else:
1946           job_age = job.end_timestamp
1947
1948         if age == -1 or now - job_age[0] > age:
1949           pending.append(job)
1950
1951           # Archive 10 jobs at a time
1952           if len(pending) >= 10:
1953             archived_count += self._ArchiveJobsUnlocked(pending)
1954             pending = []
1955
1956     if pending:
1957       archived_count += self._ArchiveJobsUnlocked(pending)
1958
1959     return (archived_count, len(all_job_ids) - last_touched)
1960
1961   def QueryJobs(self, job_ids, fields):
1962     """Returns a list of jobs in queue.
1963
1964     @type job_ids: list
1965     @param job_ids: sequence of job identifiers or None for all
1966     @type fields: list
1967     @param fields: names of fields to return
1968     @rtype: list
1969     @return: list one element per job, each element being list with
1970         the requested fields
1971
1972     """
1973     jobs = []
1974     list_all = False
1975     if not job_ids:
1976       # Since files are added to/removed from the queue atomically, there's no
1977       # risk of getting the job ids in an inconsistent state.
1978       job_ids = self._GetJobIDsUnlocked()
1979       list_all = True
1980
1981     for job_id in job_ids:
1982       job = self.SafeLoadJobFromDisk(job_id)
1983       if job is not None:
1984         jobs.append(job.GetInfo(fields))
1985       elif not list_all:
1986         jobs.append(None)
1987
1988     return jobs
1989
1990   @locking.ssynchronized(_LOCK)
1991   @_RequireOpenQueue
1992   def Shutdown(self):
1993     """Stops the job queue.
1994
1995     This shutdowns all the worker threads an closes the queue.
1996
1997     """
1998     self._wpool.TerminateWorkers()
1999
2000     self._queue_filelock.Close()
2001     self._queue_filelock = None