--select-instances hbal manpage update
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import re
35 import time
36 import weakref
37
38 try:
39   # pylint: disable-msg=E0611
40   from pyinotify import pyinotify
41 except ImportError:
42   import pyinotify
43
44 from ganeti import asyncnotifier
45 from ganeti import constants
46 from ganeti import serializer
47 from ganeti import workerpool
48 from ganeti import locking
49 from ganeti import opcodes
50 from ganeti import errors
51 from ganeti import mcpu
52 from ganeti import utils
53 from ganeti import jstore
54 from ganeti import rpc
55 from ganeti import runtime
56 from ganeti import netutils
57 from ganeti import compat
58
59
60 JOBQUEUE_THREADS = 25
61 JOBS_PER_ARCHIVE_DIRECTORY = 10000
62
63 # member lock names to be passed to @ssynchronized decorator
64 _LOCK = "_lock"
65 _QUEUE = "_queue"
66
67
68 class CancelJob(Exception):
69   """Special exception to cancel a job.
70
71   """
72
73
74 def TimeStampNow():
75   """Returns the current timestamp.
76
77   @rtype: tuple
78   @return: the current time in the (seconds, microseconds) format
79
80   """
81   return utils.SplitTime(time.time())
82
83
84 class _QueuedOpCode(object):
85   """Encapsulates an opcode object.
86
87   @ivar log: holds the execution log and consists of tuples
88   of the form C{(log_serial, timestamp, level, message)}
89   @ivar input: the OpCode we encapsulate
90   @ivar status: the current status
91   @ivar result: the result of the LU execution
92   @ivar start_timestamp: timestamp for the start of the execution
93   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
94   @ivar stop_timestamp: timestamp for the end of the execution
95
96   """
97   __slots__ = ["input", "status", "result", "log", "priority",
98                "start_timestamp", "exec_timestamp", "end_timestamp",
99                "__weakref__"]
100
101   def __init__(self, op):
102     """Constructor for the _QuededOpCode.
103
104     @type op: L{opcodes.OpCode}
105     @param op: the opcode we encapsulate
106
107     """
108     self.input = op
109     self.status = constants.OP_STATUS_QUEUED
110     self.result = None
111     self.log = []
112     self.start_timestamp = None
113     self.exec_timestamp = None
114     self.end_timestamp = None
115
116     # Get initial priority (it might change during the lifetime of this opcode)
117     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
118
119   @classmethod
120   def Restore(cls, state):
121     """Restore the _QueuedOpCode from the serialized form.
122
123     @type state: dict
124     @param state: the serialized state
125     @rtype: _QueuedOpCode
126     @return: a new _QueuedOpCode instance
127
128     """
129     obj = _QueuedOpCode.__new__(cls)
130     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
131     obj.status = state["status"]
132     obj.result = state["result"]
133     obj.log = state["log"]
134     obj.start_timestamp = state.get("start_timestamp", None)
135     obj.exec_timestamp = state.get("exec_timestamp", None)
136     obj.end_timestamp = state.get("end_timestamp", None)
137     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
138     return obj
139
140   def Serialize(self):
141     """Serializes this _QueuedOpCode.
142
143     @rtype: dict
144     @return: the dictionary holding the serialized state
145
146     """
147     return {
148       "input": self.input.__getstate__(),
149       "status": self.status,
150       "result": self.result,
151       "log": self.log,
152       "start_timestamp": self.start_timestamp,
153       "exec_timestamp": self.exec_timestamp,
154       "end_timestamp": self.end_timestamp,
155       "priority": self.priority,
156       }
157
158
159 class _QueuedJob(object):
160   """In-memory job representation.
161
162   This is what we use to track the user-submitted jobs. Locking must
163   be taken care of by users of this class.
164
165   @type queue: L{JobQueue}
166   @ivar queue: the parent queue
167   @ivar id: the job ID
168   @type ops: list
169   @ivar ops: the list of _QueuedOpCode that constitute the job
170   @type log_serial: int
171   @ivar log_serial: holds the index for the next log entry
172   @ivar received_timestamp: the timestamp for when the job was received
173   @ivar start_timestmap: the timestamp for start of execution
174   @ivar end_timestamp: the timestamp for end of execution
175
176   """
177   # pylint: disable-msg=W0212
178   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
179                "received_timestamp", "start_timestamp", "end_timestamp",
180                "__weakref__"]
181
182   def __init__(self, queue, job_id, ops):
183     """Constructor for the _QueuedJob.
184
185     @type queue: L{JobQueue}
186     @param queue: our parent queue
187     @type job_id: job_id
188     @param job_id: our job id
189     @type ops: list
190     @param ops: the list of opcodes we hold, which will be encapsulated
191         in _QueuedOpCodes
192
193     """
194     if not ops:
195       raise errors.GenericError("A job needs at least one opcode")
196
197     self.queue = queue
198     self.id = job_id
199     self.ops = [_QueuedOpCode(op) for op in ops]
200     self.log_serial = 0
201     self.received_timestamp = TimeStampNow()
202     self.start_timestamp = None
203     self.end_timestamp = None
204
205     self._InitInMemory(self)
206
207   @staticmethod
208   def _InitInMemory(obj):
209     """Initializes in-memory variables.
210
211     """
212     obj.ops_iter = None
213     obj.cur_opctx = None
214
215   def __repr__(self):
216     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
217               "id=%s" % self.id,
218               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
219
220     return "<%s at %#x>" % (" ".join(status), id(self))
221
222   @classmethod
223   def Restore(cls, queue, state):
224     """Restore a _QueuedJob from serialized state:
225
226     @type queue: L{JobQueue}
227     @param queue: to which queue the restored job belongs
228     @type state: dict
229     @param state: the serialized state
230     @rtype: _JobQueue
231     @return: the restored _JobQueue instance
232
233     """
234     obj = _QueuedJob.__new__(cls)
235     obj.queue = queue
236     obj.id = state["id"]
237     obj.received_timestamp = state.get("received_timestamp", None)
238     obj.start_timestamp = state.get("start_timestamp", None)
239     obj.end_timestamp = state.get("end_timestamp", None)
240
241     obj.ops = []
242     obj.log_serial = 0
243     for op_state in state["ops"]:
244       op = _QueuedOpCode.Restore(op_state)
245       for log_entry in op.log:
246         obj.log_serial = max(obj.log_serial, log_entry[0])
247       obj.ops.append(op)
248
249     cls._InitInMemory(obj)
250
251     return obj
252
253   def Serialize(self):
254     """Serialize the _JobQueue instance.
255
256     @rtype: dict
257     @return: the serialized state
258
259     """
260     return {
261       "id": self.id,
262       "ops": [op.Serialize() for op in self.ops],
263       "start_timestamp": self.start_timestamp,
264       "end_timestamp": self.end_timestamp,
265       "received_timestamp": self.received_timestamp,
266       }
267
268   def CalcStatus(self):
269     """Compute the status of this job.
270
271     This function iterates over all the _QueuedOpCodes in the job and
272     based on their status, computes the job status.
273
274     The algorithm is:
275       - if we find a cancelled, or finished with error, the job
276         status will be the same
277       - otherwise, the last opcode with the status one of:
278           - waitlock
279           - canceling
280           - running
281
282         will determine the job status
283
284       - otherwise, it means either all opcodes are queued, or success,
285         and the job status will be the same
286
287     @return: the job status
288
289     """
290     status = constants.JOB_STATUS_QUEUED
291
292     all_success = True
293     for op in self.ops:
294       if op.status == constants.OP_STATUS_SUCCESS:
295         continue
296
297       all_success = False
298
299       if op.status == constants.OP_STATUS_QUEUED:
300         pass
301       elif op.status == constants.OP_STATUS_WAITLOCK:
302         status = constants.JOB_STATUS_WAITLOCK
303       elif op.status == constants.OP_STATUS_RUNNING:
304         status = constants.JOB_STATUS_RUNNING
305       elif op.status == constants.OP_STATUS_CANCELING:
306         status = constants.JOB_STATUS_CANCELING
307         break
308       elif op.status == constants.OP_STATUS_ERROR:
309         status = constants.JOB_STATUS_ERROR
310         # The whole job fails if one opcode failed
311         break
312       elif op.status == constants.OP_STATUS_CANCELED:
313         status = constants.OP_STATUS_CANCELED
314         break
315
316     if all_success:
317       status = constants.JOB_STATUS_SUCCESS
318
319     return status
320
321   def CalcPriority(self):
322     """Gets the current priority for this job.
323
324     Only unfinished opcodes are considered. When all are done, the default
325     priority is used.
326
327     @rtype: int
328
329     """
330     priorities = [op.priority for op in self.ops
331                   if op.status not in constants.OPS_FINALIZED]
332
333     if not priorities:
334       # All opcodes are done, assume default priority
335       return constants.OP_PRIO_DEFAULT
336
337     return min(priorities)
338
339   def GetLogEntries(self, newer_than):
340     """Selectively returns the log entries.
341
342     @type newer_than: None or int
343     @param newer_than: if this is None, return all log entries,
344         otherwise return only the log entries with serial higher
345         than this value
346     @rtype: list
347     @return: the list of the log entries selected
348
349     """
350     if newer_than is None:
351       serial = -1
352     else:
353       serial = newer_than
354
355     entries = []
356     for op in self.ops:
357       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
358
359     return entries
360
361   def GetInfo(self, fields):
362     """Returns information about a job.
363
364     @type fields: list
365     @param fields: names of fields to return
366     @rtype: list
367     @return: list with one element for each field
368     @raise errors.OpExecError: when an invalid field
369         has been passed
370
371     """
372     row = []
373     for fname in fields:
374       if fname == "id":
375         row.append(self.id)
376       elif fname == "status":
377         row.append(self.CalcStatus())
378       elif fname == "priority":
379         row.append(self.CalcPriority())
380       elif fname == "ops":
381         row.append([op.input.__getstate__() for op in self.ops])
382       elif fname == "opresult":
383         row.append([op.result for op in self.ops])
384       elif fname == "opstatus":
385         row.append([op.status for op in self.ops])
386       elif fname == "oplog":
387         row.append([op.log for op in self.ops])
388       elif fname == "opstart":
389         row.append([op.start_timestamp for op in self.ops])
390       elif fname == "opexec":
391         row.append([op.exec_timestamp for op in self.ops])
392       elif fname == "opend":
393         row.append([op.end_timestamp for op in self.ops])
394       elif fname == "oppriority":
395         row.append([op.priority for op in self.ops])
396       elif fname == "received_ts":
397         row.append(self.received_timestamp)
398       elif fname == "start_ts":
399         row.append(self.start_timestamp)
400       elif fname == "end_ts":
401         row.append(self.end_timestamp)
402       elif fname == "summary":
403         row.append([op.input.Summary() for op in self.ops])
404       else:
405         raise errors.OpExecError("Invalid self query field '%s'" % fname)
406     return row
407
408   def MarkUnfinishedOps(self, status, result):
409     """Mark unfinished opcodes with a given status and result.
410
411     This is an utility function for marking all running or waiting to
412     be run opcodes with a given status. Opcodes which are already
413     finalised are not changed.
414
415     @param status: a given opcode status
416     @param result: the opcode result
417
418     """
419     not_marked = True
420     for op in self.ops:
421       if op.status in constants.OPS_FINALIZED:
422         assert not_marked, "Finalized opcodes found after non-finalized ones"
423         continue
424       op.status = status
425       op.result = result
426       not_marked = False
427
428   def Finalize(self):
429     """Marks the job as finalized.
430
431     """
432     self.end_timestamp = TimeStampNow()
433
434   def Cancel(self):
435     """Marks job as canceled/-ing if possible.
436
437     @rtype: tuple; (bool, string)
438     @return: Boolean describing whether job was successfully canceled or marked
439       as canceling and a text message
440
441     """
442     status = self.CalcStatus()
443
444     if status == constants.JOB_STATUS_QUEUED:
445       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
446                              "Job canceled by request")
447       self.Finalize()
448       return (True, "Job %s canceled" % self.id)
449
450     elif status == constants.JOB_STATUS_WAITLOCK:
451       # The worker will notice the new status and cancel the job
452       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
453       return (True, "Job %s will be canceled" % self.id)
454
455     else:
456       logging.debug("Job %s is no longer waiting in the queue", self.id)
457       return (False, "Job %s is no longer waiting in the queue" % self.id)
458
459
460 class _OpExecCallbacks(mcpu.OpExecCbBase):
461   def __init__(self, queue, job, op):
462     """Initializes this class.
463
464     @type queue: L{JobQueue}
465     @param queue: Job queue
466     @type job: L{_QueuedJob}
467     @param job: Job object
468     @type op: L{_QueuedOpCode}
469     @param op: OpCode
470
471     """
472     assert queue, "Queue is missing"
473     assert job, "Job is missing"
474     assert op, "Opcode is missing"
475
476     self._queue = queue
477     self._job = job
478     self._op = op
479
480   def _CheckCancel(self):
481     """Raises an exception to cancel the job if asked to.
482
483     """
484     # Cancel here if we were asked to
485     if self._op.status == constants.OP_STATUS_CANCELING:
486       logging.debug("Canceling opcode")
487       raise CancelJob()
488
489   @locking.ssynchronized(_QUEUE, shared=1)
490   def NotifyStart(self):
491     """Mark the opcode as running, not lock-waiting.
492
493     This is called from the mcpu code as a notifier function, when the LU is
494     finally about to start the Exec() method. Of course, to have end-user
495     visible results, the opcode must be initially (before calling into
496     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
497
498     """
499     assert self._op in self._job.ops
500     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
501                                constants.OP_STATUS_CANCELING)
502
503     # Cancel here if we were asked to
504     self._CheckCancel()
505
506     logging.debug("Opcode is now running")
507
508     self._op.status = constants.OP_STATUS_RUNNING
509     self._op.exec_timestamp = TimeStampNow()
510
511     # And finally replicate the job status
512     self._queue.UpdateJobUnlocked(self._job)
513
514   @locking.ssynchronized(_QUEUE, shared=1)
515   def _AppendFeedback(self, timestamp, log_type, log_msg):
516     """Internal feedback append function, with locks
517
518     """
519     self._job.log_serial += 1
520     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
521     self._queue.UpdateJobUnlocked(self._job, replicate=False)
522
523   def Feedback(self, *args):
524     """Append a log entry.
525
526     """
527     assert len(args) < 3
528
529     if len(args) == 1:
530       log_type = constants.ELOG_MESSAGE
531       log_msg = args[0]
532     else:
533       (log_type, log_msg) = args
534
535     # The time is split to make serialization easier and not lose
536     # precision.
537     timestamp = utils.SplitTime(time.time())
538     self._AppendFeedback(timestamp, log_type, log_msg)
539
540   def CheckCancel(self):
541     """Check whether job has been cancelled.
542
543     """
544     assert self._op.status in (constants.OP_STATUS_WAITLOCK,
545                                constants.OP_STATUS_CANCELING)
546
547     # Cancel here if we were asked to
548     self._CheckCancel()
549
550   def SubmitManyJobs(self, jobs):
551     """Submits jobs for processing.
552
553     See L{JobQueue.SubmitManyJobs}.
554
555     """
556     # Locking is done in job queue
557     return self._queue.SubmitManyJobs(jobs)
558
559
560 class _JobChangesChecker(object):
561   def __init__(self, fields, prev_job_info, prev_log_serial):
562     """Initializes this class.
563
564     @type fields: list of strings
565     @param fields: Fields requested by LUXI client
566     @type prev_job_info: string
567     @param prev_job_info: previous job info, as passed by the LUXI client
568     @type prev_log_serial: string
569     @param prev_log_serial: previous job serial, as passed by the LUXI client
570
571     """
572     self._fields = fields
573     self._prev_job_info = prev_job_info
574     self._prev_log_serial = prev_log_serial
575
576   def __call__(self, job):
577     """Checks whether job has changed.
578
579     @type job: L{_QueuedJob}
580     @param job: Job object
581
582     """
583     status = job.CalcStatus()
584     job_info = job.GetInfo(self._fields)
585     log_entries = job.GetLogEntries(self._prev_log_serial)
586
587     # Serializing and deserializing data can cause type changes (e.g. from
588     # tuple to list) or precision loss. We're doing it here so that we get
589     # the same modifications as the data received from the client. Without
590     # this, the comparison afterwards might fail without the data being
591     # significantly different.
592     # TODO: we just deserialized from disk, investigate how to make sure that
593     # the job info and log entries are compatible to avoid this further step.
594     # TODO: Doing something like in testutils.py:UnifyValueType might be more
595     # efficient, though floats will be tricky
596     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
597     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
598
599     # Don't even try to wait if the job is no longer running, there will be
600     # no changes.
601     if (status not in (constants.JOB_STATUS_QUEUED,
602                        constants.JOB_STATUS_RUNNING,
603                        constants.JOB_STATUS_WAITLOCK) or
604         job_info != self._prev_job_info or
605         (log_entries and self._prev_log_serial != log_entries[0][0])):
606       logging.debug("Job %s changed", job.id)
607       return (job_info, log_entries)
608
609     return None
610
611
612 class _JobFileChangesWaiter(object):
613   def __init__(self, filename):
614     """Initializes this class.
615
616     @type filename: string
617     @param filename: Path to job file
618     @raises errors.InotifyError: if the notifier cannot be setup
619
620     """
621     self._wm = pyinotify.WatchManager()
622     self._inotify_handler = \
623       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
624     self._notifier = \
625       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
626     try:
627       self._inotify_handler.enable()
628     except Exception:
629       # pyinotify doesn't close file descriptors automatically
630       self._notifier.stop()
631       raise
632
633   def _OnInotify(self, notifier_enabled):
634     """Callback for inotify.
635
636     """
637     if not notifier_enabled:
638       self._inotify_handler.enable()
639
640   def Wait(self, timeout):
641     """Waits for the job file to change.
642
643     @type timeout: float
644     @param timeout: Timeout in seconds
645     @return: Whether there have been events
646
647     """
648     assert timeout >= 0
649     have_events = self._notifier.check_events(timeout * 1000)
650     if have_events:
651       self._notifier.read_events()
652     self._notifier.process_events()
653     return have_events
654
655   def Close(self):
656     """Closes underlying notifier and its file descriptor.
657
658     """
659     self._notifier.stop()
660
661
662 class _JobChangesWaiter(object):
663   def __init__(self, filename):
664     """Initializes this class.
665
666     @type filename: string
667     @param filename: Path to job file
668
669     """
670     self._filewaiter = None
671     self._filename = filename
672
673   def Wait(self, timeout):
674     """Waits for a job to change.
675
676     @type timeout: float
677     @param timeout: Timeout in seconds
678     @return: Whether there have been events
679
680     """
681     if self._filewaiter:
682       return self._filewaiter.Wait(timeout)
683
684     # Lazy setup: Avoid inotify setup cost when job file has already changed.
685     # If this point is reached, return immediately and let caller check the job
686     # file again in case there were changes since the last check. This avoids a
687     # race condition.
688     self._filewaiter = _JobFileChangesWaiter(self._filename)
689
690     return True
691
692   def Close(self):
693     """Closes underlying waiter.
694
695     """
696     if self._filewaiter:
697       self._filewaiter.Close()
698
699
700 class _WaitForJobChangesHelper(object):
701   """Helper class using inotify to wait for changes in a job file.
702
703   This class takes a previous job status and serial, and alerts the client when
704   the current job status has changed.
705
706   """
707   @staticmethod
708   def _CheckForChanges(job_load_fn, check_fn):
709     job = job_load_fn()
710     if not job:
711       raise errors.JobLost()
712
713     result = check_fn(job)
714     if result is None:
715       raise utils.RetryAgain()
716
717     return result
718
719   def __call__(self, filename, job_load_fn,
720                fields, prev_job_info, prev_log_serial, timeout):
721     """Waits for changes on a job.
722
723     @type filename: string
724     @param filename: File on which to wait for changes
725     @type job_load_fn: callable
726     @param job_load_fn: Function to load job
727     @type fields: list of strings
728     @param fields: Which fields to check for changes
729     @type prev_job_info: list or None
730     @param prev_job_info: Last job information returned
731     @type prev_log_serial: int
732     @param prev_log_serial: Last job message serial number
733     @type timeout: float
734     @param timeout: maximum time to wait in seconds
735
736     """
737     try:
738       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
739       waiter = _JobChangesWaiter(filename)
740       try:
741         return utils.Retry(compat.partial(self._CheckForChanges,
742                                           job_load_fn, check_fn),
743                            utils.RETRY_REMAINING_TIME, timeout,
744                            wait_fn=waiter.Wait)
745       finally:
746         waiter.Close()
747     except (errors.InotifyError, errors.JobLost):
748       return None
749     except utils.RetryTimeout:
750       return constants.JOB_NOTCHANGED
751
752
753 def _EncodeOpError(err):
754   """Encodes an error which occurred while processing an opcode.
755
756   """
757   if isinstance(err, errors.GenericError):
758     to_encode = err
759   else:
760     to_encode = errors.OpExecError(str(err))
761
762   return errors.EncodeException(to_encode)
763
764
765 class _TimeoutStrategyWrapper:
766   def __init__(self, fn):
767     """Initializes this class.
768
769     """
770     self._fn = fn
771     self._next = None
772
773   def _Advance(self):
774     """Gets the next timeout if necessary.
775
776     """
777     if self._next is None:
778       self._next = self._fn()
779
780   def Peek(self):
781     """Returns the next timeout.
782
783     """
784     self._Advance()
785     return self._next
786
787   def Next(self):
788     """Returns the current timeout and advances the internal state.
789
790     """
791     self._Advance()
792     result = self._next
793     self._next = None
794     return result
795
796
797 class _OpExecContext:
798   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
799     """Initializes this class.
800
801     """
802     self.op = op
803     self.index = index
804     self.log_prefix = log_prefix
805     self.summary = op.input.Summary()
806
807     self._timeout_strategy_factory = timeout_strategy_factory
808     self._ResetTimeoutStrategy()
809
810   def _ResetTimeoutStrategy(self):
811     """Creates a new timeout strategy.
812
813     """
814     self._timeout_strategy = \
815       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
816
817   def CheckPriorityIncrease(self):
818     """Checks whether priority can and should be increased.
819
820     Called when locks couldn't be acquired.
821
822     """
823     op = self.op
824
825     # Exhausted all retries and next round should not use blocking acquire
826     # for locks?
827     if (self._timeout_strategy.Peek() is None and
828         op.priority > constants.OP_PRIO_HIGHEST):
829       logging.debug("Increasing priority")
830       op.priority -= 1
831       self._ResetTimeoutStrategy()
832       return True
833
834     return False
835
836   def GetNextLockTimeout(self):
837     """Returns the next lock acquire timeout.
838
839     """
840     return self._timeout_strategy.Next()
841
842
843 class _JobProcessor(object):
844   def __init__(self, queue, opexec_fn, job,
845                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
846     """Initializes this class.
847
848     """
849     self.queue = queue
850     self.opexec_fn = opexec_fn
851     self.job = job
852     self._timeout_strategy_factory = _timeout_strategy_factory
853
854   @staticmethod
855   def _FindNextOpcode(job, timeout_strategy_factory):
856     """Locates the next opcode to run.
857
858     @type job: L{_QueuedJob}
859     @param job: Job object
860     @param timeout_strategy_factory: Callable to create new timeout strategy
861
862     """
863     # Create some sort of a cache to speed up locating next opcode for future
864     # lookups
865     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
866     # pending and one for processed ops.
867     if job.ops_iter is None:
868       job.ops_iter = enumerate(job.ops)
869
870     # Find next opcode to run
871     while True:
872       try:
873         (idx, op) = job.ops_iter.next()
874       except StopIteration:
875         raise errors.ProgrammerError("Called for a finished job")
876
877       if op.status == constants.OP_STATUS_RUNNING:
878         # Found an opcode already marked as running
879         raise errors.ProgrammerError("Called for job marked as running")
880
881       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
882                              timeout_strategy_factory)
883
884       if op.status not in constants.OPS_FINALIZED:
885         return opctx
886
887       # This is a job that was partially completed before master daemon
888       # shutdown, so it can be expected that some opcodes are already
889       # completed successfully (if any did error out, then the whole job
890       # should have been aborted and not resubmitted for processing).
891       logging.info("%s: opcode %s already processed, skipping",
892                    opctx.log_prefix, opctx.summary)
893
894   @staticmethod
895   def _MarkWaitlock(job, op):
896     """Marks an opcode as waiting for locks.
897
898     The job's start timestamp is also set if necessary.
899
900     @type job: L{_QueuedJob}
901     @param job: Job object
902     @type op: L{_QueuedOpCode}
903     @param op: Opcode object
904
905     """
906     assert op in job.ops
907     assert op.status in (constants.OP_STATUS_QUEUED,
908                          constants.OP_STATUS_WAITLOCK)
909
910     update = False
911
912     op.result = None
913
914     if op.status == constants.OP_STATUS_QUEUED:
915       op.status = constants.OP_STATUS_WAITLOCK
916       update = True
917
918     if op.start_timestamp is None:
919       op.start_timestamp = TimeStampNow()
920       update = True
921
922     if job.start_timestamp is None:
923       job.start_timestamp = op.start_timestamp
924       update = True
925
926     assert op.status == constants.OP_STATUS_WAITLOCK
927
928     return update
929
930   def _ExecOpCodeUnlocked(self, opctx):
931     """Processes one opcode and returns the result.
932
933     """
934     op = opctx.op
935
936     assert op.status == constants.OP_STATUS_WAITLOCK
937
938     timeout = opctx.GetNextLockTimeout()
939
940     try:
941       # Make sure not to hold queue lock while calling ExecOpCode
942       result = self.opexec_fn(op.input,
943                               _OpExecCallbacks(self.queue, self.job, op),
944                               timeout=timeout, priority=op.priority)
945     except mcpu.LockAcquireTimeout:
946       assert timeout is not None, "Received timeout for blocking acquire"
947       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
948
949       assert op.status in (constants.OP_STATUS_WAITLOCK,
950                            constants.OP_STATUS_CANCELING)
951
952       # Was job cancelled while we were waiting for the lock?
953       if op.status == constants.OP_STATUS_CANCELING:
954         return (constants.OP_STATUS_CANCELING, None)
955
956       # Stay in waitlock while trying to re-acquire lock
957       return (constants.OP_STATUS_WAITLOCK, None)
958     except CancelJob:
959       logging.exception("%s: Canceling job", opctx.log_prefix)
960       assert op.status == constants.OP_STATUS_CANCELING
961       return (constants.OP_STATUS_CANCELING, None)
962     except Exception, err: # pylint: disable-msg=W0703
963       logging.exception("%s: Caught exception in %s",
964                         opctx.log_prefix, opctx.summary)
965       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
966     else:
967       logging.debug("%s: %s successful",
968                     opctx.log_prefix, opctx.summary)
969       return (constants.OP_STATUS_SUCCESS, result)
970
971   def __call__(self, _nextop_fn=None):
972     """Continues execution of a job.
973
974     @param _nextop_fn: Callback function for tests
975     @rtype: bool
976     @return: True if job is finished, False if processor needs to be called
977              again
978
979     """
980     queue = self.queue
981     job = self.job
982
983     logging.debug("Processing job %s", job.id)
984
985     queue.acquire(shared=1)
986     try:
987       opcount = len(job.ops)
988
989       # Don't do anything for finalized jobs
990       if job.CalcStatus() in constants.JOBS_FINALIZED:
991         return True
992
993       # Is a previous opcode still pending?
994       if job.cur_opctx:
995         opctx = job.cur_opctx
996         job.cur_opctx = None
997       else:
998         if __debug__ and _nextop_fn:
999           _nextop_fn()
1000         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1001
1002       op = opctx.op
1003
1004       # Consistency check
1005       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1006                                      constants.OP_STATUS_CANCELING)
1007                         for i in job.ops[opctx.index + 1:])
1008
1009       assert op.status in (constants.OP_STATUS_QUEUED,
1010                            constants.OP_STATUS_WAITLOCK,
1011                            constants.OP_STATUS_CANCELING)
1012
1013       assert (op.priority <= constants.OP_PRIO_LOWEST and
1014               op.priority >= constants.OP_PRIO_HIGHEST)
1015
1016       if op.status != constants.OP_STATUS_CANCELING:
1017         assert op.status in (constants.OP_STATUS_QUEUED,
1018                              constants.OP_STATUS_WAITLOCK)
1019
1020         # Prepare to start opcode
1021         if self._MarkWaitlock(job, op):
1022           # Write to disk
1023           queue.UpdateJobUnlocked(job)
1024
1025         assert op.status == constants.OP_STATUS_WAITLOCK
1026         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1027         assert job.start_timestamp and op.start_timestamp
1028
1029         logging.info("%s: opcode %s waiting for locks",
1030                      opctx.log_prefix, opctx.summary)
1031
1032         queue.release()
1033         try:
1034           (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1035         finally:
1036           queue.acquire(shared=1)
1037
1038         op.status = op_status
1039         op.result = op_result
1040
1041         if op.status == constants.OP_STATUS_WAITLOCK:
1042           # Couldn't get locks in time
1043           assert not op.end_timestamp
1044         else:
1045           # Finalize opcode
1046           op.end_timestamp = TimeStampNow()
1047
1048           if op.status == constants.OP_STATUS_CANCELING:
1049             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1050                                   for i in job.ops[opctx.index:])
1051           else:
1052             assert op.status in constants.OPS_FINALIZED
1053
1054       if op.status == constants.OP_STATUS_WAITLOCK:
1055         finalize = False
1056
1057         if opctx.CheckPriorityIncrease():
1058           # Priority was changed, need to update on-disk file
1059           queue.UpdateJobUnlocked(job)
1060
1061         # Keep around for another round
1062         job.cur_opctx = opctx
1063
1064         assert (op.priority <= constants.OP_PRIO_LOWEST and
1065                 op.priority >= constants.OP_PRIO_HIGHEST)
1066
1067         # In no case must the status be finalized here
1068         assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
1069
1070       else:
1071         # Ensure all opcodes so far have been successful
1072         assert (opctx.index == 0 or
1073                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1074                            for i in job.ops[:opctx.index]))
1075
1076         # Reset context
1077         job.cur_opctx = None
1078
1079         if op.status == constants.OP_STATUS_SUCCESS:
1080           finalize = False
1081
1082         elif op.status == constants.OP_STATUS_ERROR:
1083           # Ensure failed opcode has an exception as its result
1084           assert errors.GetEncodedError(job.ops[opctx.index].result)
1085
1086           to_encode = errors.OpExecError("Preceding opcode failed")
1087           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1088                                 _EncodeOpError(to_encode))
1089           finalize = True
1090
1091           # Consistency check
1092           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1093                             errors.GetEncodedError(i.result)
1094                             for i in job.ops[opctx.index:])
1095
1096         elif op.status == constants.OP_STATUS_CANCELING:
1097           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1098                                 "Job canceled by request")
1099           finalize = True
1100
1101         else:
1102           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1103
1104         if opctx.index == (opcount - 1):
1105           # Finalize on last opcode
1106           finalize = True
1107
1108         if finalize:
1109           # All opcodes have been run, finalize job
1110           job.Finalize()
1111
1112         # Write to disk. If the job status is final, this is the final write
1113         # allowed. Once the file has been written, it can be archived anytime.
1114         queue.UpdateJobUnlocked(job)
1115
1116         if finalize:
1117           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1118           return True
1119
1120       return False
1121     finally:
1122       queue.release()
1123
1124
1125 class _JobQueueWorker(workerpool.BaseWorker):
1126   """The actual job workers.
1127
1128   """
1129   def RunTask(self, job): # pylint: disable-msg=W0221
1130     """Job executor.
1131
1132     This functions processes a job. It is closely tied to the L{_QueuedJob} and
1133     L{_QueuedOpCode} classes.
1134
1135     @type job: L{_QueuedJob}
1136     @param job: the job to be processed
1137
1138     """
1139     queue = job.queue
1140     assert queue == self.pool.queue
1141
1142     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1143     setname_fn(None)
1144
1145     proc = mcpu.Processor(queue.context, job.id)
1146
1147     # Create wrapper for setting thread name
1148     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1149                                     proc.ExecOpCode)
1150
1151     if not _JobProcessor(queue, wrap_execop_fn, job)():
1152       # Schedule again
1153       raise workerpool.DeferTask(priority=job.CalcPriority())
1154
1155   @staticmethod
1156   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1157     """Updates the worker thread name to include a short summary of the opcode.
1158
1159     @param setname_fn: Callable setting worker thread name
1160     @param execop_fn: Callable for executing opcode (usually
1161                       L{mcpu.Processor.ExecOpCode})
1162
1163     """
1164     setname_fn(op)
1165     try:
1166       return execop_fn(op, *args, **kwargs)
1167     finally:
1168       setname_fn(None)
1169
1170   @staticmethod
1171   def _GetWorkerName(job, op):
1172     """Sets the worker thread name.
1173
1174     @type job: L{_QueuedJob}
1175     @type op: L{opcodes.OpCode}
1176
1177     """
1178     parts = ["Job%s" % job.id]
1179
1180     if op:
1181       parts.append(op.TinySummary())
1182
1183     return "/".join(parts)
1184
1185
1186 class _JobQueueWorkerPool(workerpool.WorkerPool):
1187   """Simple class implementing a job-processing workerpool.
1188
1189   """
1190   def __init__(self, queue):
1191     super(_JobQueueWorkerPool, self).__init__("Jq",
1192                                               JOBQUEUE_THREADS,
1193                                               _JobQueueWorker)
1194     self.queue = queue
1195
1196
1197 def _RequireOpenQueue(fn):
1198   """Decorator for "public" functions.
1199
1200   This function should be used for all 'public' functions. That is,
1201   functions usually called from other classes. Note that this should
1202   be applied only to methods (not plain functions), since it expects
1203   that the decorated function is called with a first argument that has
1204   a '_queue_filelock' argument.
1205
1206   @warning: Use this decorator only after locking.ssynchronized
1207
1208   Example::
1209     @locking.ssynchronized(_LOCK)
1210     @_RequireOpenQueue
1211     def Example(self):
1212       pass
1213
1214   """
1215   def wrapper(self, *args, **kwargs):
1216     # pylint: disable-msg=W0212
1217     assert self._queue_filelock is not None, "Queue should be open"
1218     return fn(self, *args, **kwargs)
1219   return wrapper
1220
1221
1222 class JobQueue(object):
1223   """Queue used to manage the jobs.
1224
1225   @cvar _RE_JOB_FILE: regex matching the valid job file names
1226
1227   """
1228   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1229
1230   def __init__(self, context):
1231     """Constructor for JobQueue.
1232
1233     The constructor will initialize the job queue object and then
1234     start loading the current jobs from disk, either for starting them
1235     (if they were queue) or for aborting them (if they were already
1236     running).
1237
1238     @type context: GanetiContext
1239     @param context: the context object for access to the configuration
1240         data and other ganeti objects
1241
1242     """
1243     self.context = context
1244     self._memcache = weakref.WeakValueDictionary()
1245     self._my_hostname = netutils.Hostname.GetSysName()
1246
1247     # The Big JobQueue lock. If a code block or method acquires it in shared
1248     # mode safe it must guarantee concurrency with all the code acquiring it in
1249     # shared mode, including itself. In order not to acquire it at all
1250     # concurrency must be guaranteed with all code acquiring it in shared mode
1251     # and all code acquiring it exclusively.
1252     self._lock = locking.SharedLock("JobQueue")
1253
1254     self.acquire = self._lock.acquire
1255     self.release = self._lock.release
1256
1257     # Initialize the queue, and acquire the filelock.
1258     # This ensures no other process is working on the job queue.
1259     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1260
1261     # Read serial file
1262     self._last_serial = jstore.ReadSerial()
1263     assert self._last_serial is not None, ("Serial file was modified between"
1264                                            " check in jstore and here")
1265
1266     # Get initial list of nodes
1267     self._nodes = dict((n.name, n.primary_ip)
1268                        for n in self.context.cfg.GetAllNodesInfo().values()
1269                        if n.master_candidate)
1270
1271     # Remove master node
1272     self._nodes.pop(self._my_hostname, None)
1273
1274     # TODO: Check consistency across nodes
1275
1276     self._queue_size = 0
1277     self._UpdateQueueSizeUnlocked()
1278     self._drained = jstore.CheckDrainFlag()
1279
1280     # Setup worker pool
1281     self._wpool = _JobQueueWorkerPool(self)
1282     try:
1283       self._InspectQueue()
1284     except:
1285       self._wpool.TerminateWorkers()
1286       raise
1287
1288   @locking.ssynchronized(_LOCK)
1289   @_RequireOpenQueue
1290   def _InspectQueue(self):
1291     """Loads the whole job queue and resumes unfinished jobs.
1292
1293     This function needs the lock here because WorkerPool.AddTask() may start a
1294     job while we're still doing our work.
1295
1296     """
1297     logging.info("Inspecting job queue")
1298
1299     restartjobs = []
1300
1301     all_job_ids = self._GetJobIDsUnlocked()
1302     jobs_count = len(all_job_ids)
1303     lastinfo = time.time()
1304     for idx, job_id in enumerate(all_job_ids):
1305       # Give an update every 1000 jobs or 10 seconds
1306       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1307           idx == (jobs_count - 1)):
1308         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1309                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1310         lastinfo = time.time()
1311
1312       job = self._LoadJobUnlocked(job_id)
1313
1314       # a failure in loading the job can cause 'None' to be returned
1315       if job is None:
1316         continue
1317
1318       status = job.CalcStatus()
1319
1320       if status == constants.JOB_STATUS_QUEUED:
1321         restartjobs.append(job)
1322
1323       elif status in (constants.JOB_STATUS_RUNNING,
1324                       constants.JOB_STATUS_WAITLOCK,
1325                       constants.JOB_STATUS_CANCELING):
1326         logging.warning("Unfinished job %s found: %s", job.id, job)
1327
1328         if status == constants.JOB_STATUS_WAITLOCK:
1329           # Restart job
1330           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1331           restartjobs.append(job)
1332         else:
1333           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1334                                 "Unclean master daemon shutdown")
1335
1336         self.UpdateJobUnlocked(job)
1337
1338     if restartjobs:
1339       logging.info("Restarting %s jobs", len(restartjobs))
1340       self._EnqueueJobs(restartjobs)
1341
1342     logging.info("Job queue inspection finished")
1343
1344   @locking.ssynchronized(_LOCK)
1345   @_RequireOpenQueue
1346   def AddNode(self, node):
1347     """Register a new node with the queue.
1348
1349     @type node: L{objects.Node}
1350     @param node: the node object to be added
1351
1352     """
1353     node_name = node.name
1354     assert node_name != self._my_hostname
1355
1356     # Clean queue directory on added node
1357     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1358     msg = result.fail_msg
1359     if msg:
1360       logging.warning("Cannot cleanup queue directory on node %s: %s",
1361                       node_name, msg)
1362
1363     if not node.master_candidate:
1364       # remove if existing, ignoring errors
1365       self._nodes.pop(node_name, None)
1366       # and skip the replication of the job ids
1367       return
1368
1369     # Upload the whole queue excluding archived jobs
1370     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1371
1372     # Upload current serial file
1373     files.append(constants.JOB_QUEUE_SERIAL_FILE)
1374
1375     for file_name in files:
1376       # Read file content
1377       content = utils.ReadFile(file_name)
1378
1379       result = rpc.RpcRunner.call_jobqueue_update([node_name],
1380                                                   [node.primary_ip],
1381                                                   file_name, content)
1382       msg = result[node_name].fail_msg
1383       if msg:
1384         logging.error("Failed to upload file %s to node %s: %s",
1385                       file_name, node_name, msg)
1386
1387     self._nodes[node_name] = node.primary_ip
1388
1389   @locking.ssynchronized(_LOCK)
1390   @_RequireOpenQueue
1391   def RemoveNode(self, node_name):
1392     """Callback called when removing nodes from the cluster.
1393
1394     @type node_name: str
1395     @param node_name: the name of the node to remove
1396
1397     """
1398     self._nodes.pop(node_name, None)
1399
1400   @staticmethod
1401   def _CheckRpcResult(result, nodes, failmsg):
1402     """Verifies the status of an RPC call.
1403
1404     Since we aim to keep consistency should this node (the current
1405     master) fail, we will log errors if our rpc fail, and especially
1406     log the case when more than half of the nodes fails.
1407
1408     @param result: the data as returned from the rpc call
1409     @type nodes: list
1410     @param nodes: the list of nodes we made the call to
1411     @type failmsg: str
1412     @param failmsg: the identifier to be used for logging
1413
1414     """
1415     failed = []
1416     success = []
1417
1418     for node in nodes:
1419       msg = result[node].fail_msg
1420       if msg:
1421         failed.append(node)
1422         logging.error("RPC call %s (%s) failed on node %s: %s",
1423                       result[node].call, failmsg, node, msg)
1424       else:
1425         success.append(node)
1426
1427     # +1 for the master node
1428     if (len(success) + 1) < len(failed):
1429       # TODO: Handle failing nodes
1430       logging.error("More than half of the nodes failed")
1431
1432   def _GetNodeIp(self):
1433     """Helper for returning the node name/ip list.
1434
1435     @rtype: (list, list)
1436     @return: a tuple of two lists, the first one with the node
1437         names and the second one with the node addresses
1438
1439     """
1440     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1441     name_list = self._nodes.keys()
1442     addr_list = [self._nodes[name] for name in name_list]
1443     return name_list, addr_list
1444
1445   def _UpdateJobQueueFile(self, file_name, data, replicate):
1446     """Writes a file locally and then replicates it to all nodes.
1447
1448     This function will replace the contents of a file on the local
1449     node and then replicate it to all the other nodes we have.
1450
1451     @type file_name: str
1452     @param file_name: the path of the file to be replicated
1453     @type data: str
1454     @param data: the new contents of the file
1455     @type replicate: boolean
1456     @param replicate: whether to spread the changes to the remote nodes
1457
1458     """
1459     getents = runtime.GetEnts()
1460     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1461                     gid=getents.masterd_gid)
1462
1463     if replicate:
1464       names, addrs = self._GetNodeIp()
1465       result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1466       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1467
1468   def _RenameFilesUnlocked(self, rename):
1469     """Renames a file locally and then replicate the change.
1470
1471     This function will rename a file in the local queue directory
1472     and then replicate this rename to all the other nodes we have.
1473
1474     @type rename: list of (old, new)
1475     @param rename: List containing tuples mapping old to new names
1476
1477     """
1478     # Rename them locally
1479     for old, new in rename:
1480       utils.RenameFile(old, new, mkdir=True)
1481
1482     # ... and on all nodes
1483     names, addrs = self._GetNodeIp()
1484     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1485     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1486
1487   @staticmethod
1488   def _FormatJobID(job_id):
1489     """Convert a job ID to string format.
1490
1491     Currently this just does C{str(job_id)} after performing some
1492     checks, but if we want to change the job id format this will
1493     abstract this change.
1494
1495     @type job_id: int or long
1496     @param job_id: the numeric job id
1497     @rtype: str
1498     @return: the formatted job id
1499
1500     """
1501     if not isinstance(job_id, (int, long)):
1502       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1503     if job_id < 0:
1504       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1505
1506     return str(job_id)
1507
1508   @classmethod
1509   def _GetArchiveDirectory(cls, job_id):
1510     """Returns the archive directory for a job.
1511
1512     @type job_id: str
1513     @param job_id: Job identifier
1514     @rtype: str
1515     @return: Directory name
1516
1517     """
1518     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1519
1520   def _NewSerialsUnlocked(self, count):
1521     """Generates a new job identifier.
1522
1523     Job identifiers are unique during the lifetime of a cluster.
1524
1525     @type count: integer
1526     @param count: how many serials to return
1527     @rtype: str
1528     @return: a string representing the job identifier.
1529
1530     """
1531     assert count > 0
1532     # New number
1533     serial = self._last_serial + count
1534
1535     # Write to file
1536     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1537                              "%s\n" % serial, True)
1538
1539     result = [self._FormatJobID(v)
1540               for v in range(self._last_serial, serial + 1)]
1541     # Keep it only if we were able to write the file
1542     self._last_serial = serial
1543
1544     return result
1545
1546   @staticmethod
1547   def _GetJobPath(job_id):
1548     """Returns the job file for a given job id.
1549
1550     @type job_id: str
1551     @param job_id: the job identifier
1552     @rtype: str
1553     @return: the path to the job file
1554
1555     """
1556     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1557
1558   @classmethod
1559   def _GetArchivedJobPath(cls, job_id):
1560     """Returns the archived job file for a give job id.
1561
1562     @type job_id: str
1563     @param job_id: the job identifier
1564     @rtype: str
1565     @return: the path to the archived job file
1566
1567     """
1568     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1569                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1570
1571   def _GetJobIDsUnlocked(self, sort=True):
1572     """Return all known job IDs.
1573
1574     The method only looks at disk because it's a requirement that all
1575     jobs are present on disk (so in the _memcache we don't have any
1576     extra IDs).
1577
1578     @type sort: boolean
1579     @param sort: perform sorting on the returned job ids
1580     @rtype: list
1581     @return: the list of job IDs
1582
1583     """
1584     jlist = []
1585     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1586       m = self._RE_JOB_FILE.match(filename)
1587       if m:
1588         jlist.append(m.group(1))
1589     if sort:
1590       jlist = utils.NiceSort(jlist)
1591     return jlist
1592
1593   def _LoadJobUnlocked(self, job_id):
1594     """Loads a job from the disk or memory.
1595
1596     Given a job id, this will return the cached job object if
1597     existing, or try to load the job from the disk. If loading from
1598     disk, it will also add the job to the cache.
1599
1600     @param job_id: the job id
1601     @rtype: L{_QueuedJob} or None
1602     @return: either None or the job object
1603
1604     """
1605     job = self._memcache.get(job_id, None)
1606     if job:
1607       logging.debug("Found job %s in memcache", job_id)
1608       return job
1609
1610     try:
1611       job = self._LoadJobFromDisk(job_id, False)
1612       if job is None:
1613         return job
1614     except errors.JobFileCorrupted:
1615       old_path = self._GetJobPath(job_id)
1616       new_path = self._GetArchivedJobPath(job_id)
1617       if old_path == new_path:
1618         # job already archived (future case)
1619         logging.exception("Can't parse job %s", job_id)
1620       else:
1621         # non-archived case
1622         logging.exception("Can't parse job %s, will archive.", job_id)
1623         self._RenameFilesUnlocked([(old_path, new_path)])
1624       return None
1625
1626     self._memcache[job_id] = job
1627     logging.debug("Added job %s to the cache", job_id)
1628     return job
1629
1630   def _LoadJobFromDisk(self, job_id, try_archived):
1631     """Load the given job file from disk.
1632
1633     Given a job file, read, load and restore it in a _QueuedJob format.
1634
1635     @type job_id: string
1636     @param job_id: job identifier
1637     @type try_archived: bool
1638     @param try_archived: Whether to try loading an archived job
1639     @rtype: L{_QueuedJob} or None
1640     @return: either None or the job object
1641
1642     """
1643     path_functions = [self._GetJobPath]
1644
1645     if try_archived:
1646       path_functions.append(self._GetArchivedJobPath)
1647
1648     raw_data = None
1649
1650     for fn in path_functions:
1651       filepath = fn(job_id)
1652       logging.debug("Loading job from %s", filepath)
1653       try:
1654         raw_data = utils.ReadFile(filepath)
1655       except EnvironmentError, err:
1656         if err.errno != errno.ENOENT:
1657           raise
1658       else:
1659         break
1660
1661     if not raw_data:
1662       return None
1663
1664     try:
1665       data = serializer.LoadJson(raw_data)
1666       job = _QueuedJob.Restore(self, data)
1667     except Exception, err: # pylint: disable-msg=W0703
1668       raise errors.JobFileCorrupted(err)
1669
1670     return job
1671
1672   def SafeLoadJobFromDisk(self, job_id, try_archived):
1673     """Load the given job file from disk.
1674
1675     Given a job file, read, load and restore it in a _QueuedJob format.
1676     In case of error reading the job, it gets returned as None, and the
1677     exception is logged.
1678
1679     @type job_id: string
1680     @param job_id: job identifier
1681     @type try_archived: bool
1682     @param try_archived: Whether to try loading an archived job
1683     @rtype: L{_QueuedJob} or None
1684     @return: either None or the job object
1685
1686     """
1687     try:
1688       return self._LoadJobFromDisk(job_id, try_archived)
1689     except (errors.JobFileCorrupted, EnvironmentError):
1690       logging.exception("Can't load/parse job %s", job_id)
1691       return None
1692
1693   def _UpdateQueueSizeUnlocked(self):
1694     """Update the queue size.
1695
1696     """
1697     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1698
1699   @locking.ssynchronized(_LOCK)
1700   @_RequireOpenQueue
1701   def SetDrainFlag(self, drain_flag):
1702     """Sets the drain flag for the queue.
1703
1704     @type drain_flag: boolean
1705     @param drain_flag: Whether to set or unset the drain flag
1706
1707     """
1708     jstore.SetDrainFlag(drain_flag)
1709
1710     self._drained = drain_flag
1711
1712     return True
1713
1714   @_RequireOpenQueue
1715   def _SubmitJobUnlocked(self, job_id, ops):
1716     """Create and store a new job.
1717
1718     This enters the job into our job queue and also puts it on the new
1719     queue, in order for it to be picked up by the queue processors.
1720
1721     @type job_id: job ID
1722     @param job_id: the job ID for the new job
1723     @type ops: list
1724     @param ops: The list of OpCodes that will become the new job.
1725     @rtype: L{_QueuedJob}
1726     @return: the job object to be queued
1727     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1728     @raise errors.JobQueueFull: if the job queue has too many jobs in it
1729     @raise errors.GenericError: If an opcode is not valid
1730
1731     """
1732     # Ok when sharing the big job queue lock, as the drain file is created when
1733     # the lock is exclusive.
1734     if self._drained:
1735       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1736
1737     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1738       raise errors.JobQueueFull()
1739
1740     job = _QueuedJob(self, job_id, ops)
1741
1742     # Check priority
1743     for idx, op in enumerate(job.ops):
1744       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
1745         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1746         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1747                                   " are %s" % (idx, op.priority, allowed))
1748
1749     # Write to disk
1750     self.UpdateJobUnlocked(job)
1751
1752     self._queue_size += 1
1753
1754     logging.debug("Adding new job %s to the cache", job_id)
1755     self._memcache[job_id] = job
1756
1757     return job
1758
1759   @locking.ssynchronized(_LOCK)
1760   @_RequireOpenQueue
1761   def SubmitJob(self, ops):
1762     """Create and store a new job.
1763
1764     @see: L{_SubmitJobUnlocked}
1765
1766     """
1767     job_id = self._NewSerialsUnlocked(1)[0]
1768     self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
1769     return job_id
1770
1771   @locking.ssynchronized(_LOCK)
1772   @_RequireOpenQueue
1773   def SubmitManyJobs(self, jobs):
1774     """Create and store multiple jobs.
1775
1776     @see: L{_SubmitJobUnlocked}
1777
1778     """
1779     results = []
1780     added_jobs = []
1781     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1782     for job_id, ops in zip(all_job_ids, jobs):
1783       try:
1784         added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
1785         status = True
1786         data = job_id
1787       except errors.GenericError, err:
1788         data = ("%s; opcodes %s" %
1789                 (err, utils.CommaJoin(op.Summary() for op in ops)))
1790         status = False
1791       results.append((status, data))
1792
1793     self._EnqueueJobs(added_jobs)
1794
1795     return results
1796
1797   def _EnqueueJobs(self, jobs):
1798     """Helper function to add jobs to worker pool's queue.
1799
1800     @type jobs: list
1801     @param jobs: List of all jobs
1802
1803     """
1804     self._wpool.AddManyTasks([(job, ) for job in jobs],
1805                              priority=[job.CalcPriority() for job in jobs])
1806
1807   @_RequireOpenQueue
1808   def UpdateJobUnlocked(self, job, replicate=True):
1809     """Update a job's on disk storage.
1810
1811     After a job has been modified, this function needs to be called in
1812     order to write the changes to disk and replicate them to the other
1813     nodes.
1814
1815     @type job: L{_QueuedJob}
1816     @param job: the changed job
1817     @type replicate: boolean
1818     @param replicate: whether to replicate the change to remote nodes
1819
1820     """
1821     if __debug__:
1822       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1823       assert (finalized ^ (job.end_timestamp is None))
1824
1825     filename = self._GetJobPath(job.id)
1826     data = serializer.DumpJson(job.Serialize(), indent=False)
1827     logging.debug("Writing job %s to %s", job.id, filename)
1828     self._UpdateJobQueueFile(filename, data, replicate)
1829
1830   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1831                         timeout):
1832     """Waits for changes in a job.
1833
1834     @type job_id: string
1835     @param job_id: Job identifier
1836     @type fields: list of strings
1837     @param fields: Which fields to check for changes
1838     @type prev_job_info: list or None
1839     @param prev_job_info: Last job information returned
1840     @type prev_log_serial: int
1841     @param prev_log_serial: Last job message serial number
1842     @type timeout: float
1843     @param timeout: maximum time to wait in seconds
1844     @rtype: tuple (job info, log entries)
1845     @return: a tuple of the job information as required via
1846         the fields parameter, and the log entries as a list
1847
1848         if the job has not changed and the timeout has expired,
1849         we instead return a special value,
1850         L{constants.JOB_NOTCHANGED}, which should be interpreted
1851         as such by the clients
1852
1853     """
1854     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False)
1855
1856     helper = _WaitForJobChangesHelper()
1857
1858     return helper(self._GetJobPath(job_id), load_fn,
1859                   fields, prev_job_info, prev_log_serial, timeout)
1860
1861   @locking.ssynchronized(_LOCK)
1862   @_RequireOpenQueue
1863   def CancelJob(self, job_id):
1864     """Cancels a job.
1865
1866     This will only succeed if the job has not started yet.
1867
1868     @type job_id: string
1869     @param job_id: job ID of job to be cancelled.
1870
1871     """
1872     logging.info("Cancelling job %s", job_id)
1873
1874     job = self._LoadJobUnlocked(job_id)
1875     if not job:
1876       logging.debug("Job %s not found", job_id)
1877       return (False, "Job %s not found" % job_id)
1878
1879     (success, msg) = job.Cancel()
1880
1881     if success:
1882       # If the job was finalized (e.g. cancelled), this is the final write
1883       # allowed. The job can be archived anytime.
1884       self.UpdateJobUnlocked(job)
1885
1886     return (success, msg)
1887
1888   @_RequireOpenQueue
1889   def _ArchiveJobsUnlocked(self, jobs):
1890     """Archives jobs.
1891
1892     @type jobs: list of L{_QueuedJob}
1893     @param jobs: Job objects
1894     @rtype: int
1895     @return: Number of archived jobs
1896
1897     """
1898     archive_jobs = []
1899     rename_files = []
1900     for job in jobs:
1901       if job.CalcStatus() not in constants.JOBS_FINALIZED:
1902         logging.debug("Job %s is not yet done", job.id)
1903         continue
1904
1905       archive_jobs.append(job)
1906
1907       old = self._GetJobPath(job.id)
1908       new = self._GetArchivedJobPath(job.id)
1909       rename_files.append((old, new))
1910
1911     # TODO: What if 1..n files fail to rename?
1912     self._RenameFilesUnlocked(rename_files)
1913
1914     logging.debug("Successfully archived job(s) %s",
1915                   utils.CommaJoin(job.id for job in archive_jobs))
1916
1917     # Since we haven't quite checked, above, if we succeeded or failed renaming
1918     # the files, we update the cached queue size from the filesystem. When we
1919     # get around to fix the TODO: above, we can use the number of actually
1920     # archived jobs to fix this.
1921     self._UpdateQueueSizeUnlocked()
1922     return len(archive_jobs)
1923
1924   @locking.ssynchronized(_LOCK)
1925   @_RequireOpenQueue
1926   def ArchiveJob(self, job_id):
1927     """Archives a job.
1928
1929     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1930
1931     @type job_id: string
1932     @param job_id: Job ID of job to be archived.
1933     @rtype: bool
1934     @return: Whether job was archived
1935
1936     """
1937     logging.info("Archiving job %s", job_id)
1938
1939     job = self._LoadJobUnlocked(job_id)
1940     if not job:
1941       logging.debug("Job %s not found", job_id)
1942       return False
1943
1944     return self._ArchiveJobsUnlocked([job]) == 1
1945
1946   @locking.ssynchronized(_LOCK)
1947   @_RequireOpenQueue
1948   def AutoArchiveJobs(self, age, timeout):
1949     """Archives all jobs based on age.
1950
1951     The method will archive all jobs which are older than the age
1952     parameter. For jobs that don't have an end timestamp, the start
1953     timestamp will be considered. The special '-1' age will cause
1954     archival of all jobs (that are not running or queued).
1955
1956     @type age: int
1957     @param age: the minimum age in seconds
1958
1959     """
1960     logging.info("Archiving jobs with age more than %s seconds", age)
1961
1962     now = time.time()
1963     end_time = now + timeout
1964     archived_count = 0
1965     last_touched = 0
1966
1967     all_job_ids = self._GetJobIDsUnlocked()
1968     pending = []
1969     for idx, job_id in enumerate(all_job_ids):
1970       last_touched = idx + 1
1971
1972       # Not optimal because jobs could be pending
1973       # TODO: Measure average duration for job archival and take number of
1974       # pending jobs into account.
1975       if time.time() > end_time:
1976         break
1977
1978       # Returns None if the job failed to load
1979       job = self._LoadJobUnlocked(job_id)
1980       if job:
1981         if job.end_timestamp is None:
1982           if job.start_timestamp is None:
1983             job_age = job.received_timestamp
1984           else:
1985             job_age = job.start_timestamp
1986         else:
1987           job_age = job.end_timestamp
1988
1989         if age == -1 or now - job_age[0] > age:
1990           pending.append(job)
1991
1992           # Archive 10 jobs at a time
1993           if len(pending) >= 10:
1994             archived_count += self._ArchiveJobsUnlocked(pending)
1995             pending = []
1996
1997     if pending:
1998       archived_count += self._ArchiveJobsUnlocked(pending)
1999
2000     return (archived_count, len(all_job_ids) - last_touched)
2001
2002   def QueryJobs(self, job_ids, fields):
2003     """Returns a list of jobs in queue.
2004
2005     @type job_ids: list
2006     @param job_ids: sequence of job identifiers or None for all
2007     @type fields: list
2008     @param fields: names of fields to return
2009     @rtype: list
2010     @return: list one element per job, each element being list with
2011         the requested fields
2012
2013     """
2014     jobs = []
2015     list_all = False
2016     if not job_ids:
2017       # Since files are added to/removed from the queue atomically, there's no
2018       # risk of getting the job ids in an inconsistent state.
2019       job_ids = self._GetJobIDsUnlocked()
2020       list_all = True
2021
2022     for job_id in job_ids:
2023       job = self.SafeLoadJobFromDisk(job_id, True)
2024       if job is not None:
2025         jobs.append(job.GetInfo(fields))
2026       elif not list_all:
2027         jobs.append(None)
2028
2029     return jobs
2030
2031   @locking.ssynchronized(_LOCK)
2032   @_RequireOpenQueue
2033   def Shutdown(self):
2034     """Stops the job queue.
2035
2036     This shutdowns all the worker threads an closes the queue.
2037
2038     """
2039     self._wpool.TerminateWorkers()
2040
2041     self._queue_filelock.Close()
2042     self._queue_filelock = None