Switch job IDs to numeric
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38
39 try:
40   # pylint: disable=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59 from ganeti import ht
60 from ganeti import query
61 from ganeti import qlang
62
63
64 JOBQUEUE_THREADS = 25
65
66 # member lock names to be passed to @ssynchronized decorator
67 _LOCK = "_lock"
68 _QUEUE = "_queue"
69
70
71 class CancelJob(Exception):
72   """Special exception to cancel a job.
73
74   """
75
76
77 def TimeStampNow():
78   """Returns the current timestamp.
79
80   @rtype: tuple
81   @return: the current time in the (seconds, microseconds) format
82
83   """
84   return utils.SplitTime(time.time())
85
86
87 class _SimpleJobQuery:
88   """Wrapper for job queries.
89
90   Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
91
92   """
93   def __init__(self, fields):
94     """Initializes this class.
95
96     """
97     self._query = query.Query(query.JOB_FIELDS, fields)
98
99   def __call__(self, job):
100     """Executes a job query using cached field list.
101
102     """
103     return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
104
105
106 class _QueuedOpCode(object):
107   """Encapsulates an opcode object.
108
109   @ivar log: holds the execution log and consists of tuples
110   of the form C{(log_serial, timestamp, level, message)}
111   @ivar input: the OpCode we encapsulate
112   @ivar status: the current status
113   @ivar result: the result of the LU execution
114   @ivar start_timestamp: timestamp for the start of the execution
115   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
116   @ivar stop_timestamp: timestamp for the end of the execution
117
118   """
119   __slots__ = ["input", "status", "result", "log", "priority",
120                "start_timestamp", "exec_timestamp", "end_timestamp",
121                "__weakref__"]
122
123   def __init__(self, op):
124     """Initializes instances of this class.
125
126     @type op: L{opcodes.OpCode}
127     @param op: the opcode we encapsulate
128
129     """
130     self.input = op
131     self.status = constants.OP_STATUS_QUEUED
132     self.result = None
133     self.log = []
134     self.start_timestamp = None
135     self.exec_timestamp = None
136     self.end_timestamp = None
137
138     # Get initial priority (it might change during the lifetime of this opcode)
139     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
140
141   @classmethod
142   def Restore(cls, state):
143     """Restore the _QueuedOpCode from the serialized form.
144
145     @type state: dict
146     @param state: the serialized state
147     @rtype: _QueuedOpCode
148     @return: a new _QueuedOpCode instance
149
150     """
151     obj = _QueuedOpCode.__new__(cls)
152     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
153     obj.status = state["status"]
154     obj.result = state["result"]
155     obj.log = state["log"]
156     obj.start_timestamp = state.get("start_timestamp", None)
157     obj.exec_timestamp = state.get("exec_timestamp", None)
158     obj.end_timestamp = state.get("end_timestamp", None)
159     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
160     return obj
161
162   def Serialize(self):
163     """Serializes this _QueuedOpCode.
164
165     @rtype: dict
166     @return: the dictionary holding the serialized state
167
168     """
169     return {
170       "input": self.input.__getstate__(),
171       "status": self.status,
172       "result": self.result,
173       "log": self.log,
174       "start_timestamp": self.start_timestamp,
175       "exec_timestamp": self.exec_timestamp,
176       "end_timestamp": self.end_timestamp,
177       "priority": self.priority,
178       }
179
180
181 class _QueuedJob(object):
182   """In-memory job representation.
183
184   This is what we use to track the user-submitted jobs. Locking must
185   be taken care of by users of this class.
186
187   @type queue: L{JobQueue}
188   @ivar queue: the parent queue
189   @ivar id: the job ID
190   @type ops: list
191   @ivar ops: the list of _QueuedOpCode that constitute the job
192   @type log_serial: int
193   @ivar log_serial: holds the index for the next log entry
194   @ivar received_timestamp: the timestamp for when the job was received
195   @ivar start_timestmap: the timestamp for start of execution
196   @ivar end_timestamp: the timestamp for end of execution
197   @ivar writable: Whether the job is allowed to be modified
198
199   """
200   # pylint: disable=W0212
201   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
202                "received_timestamp", "start_timestamp", "end_timestamp",
203                "__weakref__", "processor_lock", "writable"]
204
205   def __init__(self, queue, job_id, ops, writable):
206     """Constructor for the _QueuedJob.
207
208     @type queue: L{JobQueue}
209     @param queue: our parent queue
210     @type job_id: job_id
211     @param job_id: our job id
212     @type ops: list
213     @param ops: the list of opcodes we hold, which will be encapsulated
214         in _QueuedOpCodes
215     @type writable: bool
216     @param writable: Whether job can be modified
217
218     """
219     if not ops:
220       raise errors.GenericError("A job needs at least one opcode")
221
222     self.queue = queue
223     self.id = int(job_id)
224     self.ops = [_QueuedOpCode(op) for op in ops]
225     self.log_serial = 0
226     self.received_timestamp = TimeStampNow()
227     self.start_timestamp = None
228     self.end_timestamp = None
229
230     self._InitInMemory(self, writable)
231
232   @staticmethod
233   def _InitInMemory(obj, writable):
234     """Initializes in-memory variables.
235
236     """
237     obj.writable = writable
238     obj.ops_iter = None
239     obj.cur_opctx = None
240
241     # Read-only jobs are not processed and therefore don't need a lock
242     if writable:
243       obj.processor_lock = threading.Lock()
244     else:
245       obj.processor_lock = None
246
247   def __repr__(self):
248     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
249               "id=%s" % self.id,
250               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
251
252     return "<%s at %#x>" % (" ".join(status), id(self))
253
254   @classmethod
255   def Restore(cls, queue, state, writable):
256     """Restore a _QueuedJob from serialized state:
257
258     @type queue: L{JobQueue}
259     @param queue: to which queue the restored job belongs
260     @type state: dict
261     @param state: the serialized state
262     @type writable: bool
263     @param writable: Whether job can be modified
264     @rtype: _JobQueue
265     @return: the restored _JobQueue instance
266
267     """
268     obj = _QueuedJob.__new__(cls)
269     obj.queue = queue
270     obj.id = int(state["id"])
271     obj.received_timestamp = state.get("received_timestamp", None)
272     obj.start_timestamp = state.get("start_timestamp", None)
273     obj.end_timestamp = state.get("end_timestamp", None)
274
275     obj.ops = []
276     obj.log_serial = 0
277     for op_state in state["ops"]:
278       op = _QueuedOpCode.Restore(op_state)
279       for log_entry in op.log:
280         obj.log_serial = max(obj.log_serial, log_entry[0])
281       obj.ops.append(op)
282
283     cls._InitInMemory(obj, writable)
284
285     return obj
286
287   def Serialize(self):
288     """Serialize the _JobQueue instance.
289
290     @rtype: dict
291     @return: the serialized state
292
293     """
294     return {
295       "id": self.id,
296       "ops": [op.Serialize() for op in self.ops],
297       "start_timestamp": self.start_timestamp,
298       "end_timestamp": self.end_timestamp,
299       "received_timestamp": self.received_timestamp,
300       }
301
302   def CalcStatus(self):
303     """Compute the status of this job.
304
305     This function iterates over all the _QueuedOpCodes in the job and
306     based on their status, computes the job status.
307
308     The algorithm is:
309       - if we find a cancelled, or finished with error, the job
310         status will be the same
311       - otherwise, the last opcode with the status one of:
312           - waitlock
313           - canceling
314           - running
315
316         will determine the job status
317
318       - otherwise, it means either all opcodes are queued, or success,
319         and the job status will be the same
320
321     @return: the job status
322
323     """
324     status = constants.JOB_STATUS_QUEUED
325
326     all_success = True
327     for op in self.ops:
328       if op.status == constants.OP_STATUS_SUCCESS:
329         continue
330
331       all_success = False
332
333       if op.status == constants.OP_STATUS_QUEUED:
334         pass
335       elif op.status == constants.OP_STATUS_WAITING:
336         status = constants.JOB_STATUS_WAITING
337       elif op.status == constants.OP_STATUS_RUNNING:
338         status = constants.JOB_STATUS_RUNNING
339       elif op.status == constants.OP_STATUS_CANCELING:
340         status = constants.JOB_STATUS_CANCELING
341         break
342       elif op.status == constants.OP_STATUS_ERROR:
343         status = constants.JOB_STATUS_ERROR
344         # The whole job fails if one opcode failed
345         break
346       elif op.status == constants.OP_STATUS_CANCELED:
347         status = constants.OP_STATUS_CANCELED
348         break
349
350     if all_success:
351       status = constants.JOB_STATUS_SUCCESS
352
353     return status
354
355   def CalcPriority(self):
356     """Gets the current priority for this job.
357
358     Only unfinished opcodes are considered. When all are done, the default
359     priority is used.
360
361     @rtype: int
362
363     """
364     priorities = [op.priority for op in self.ops
365                   if op.status not in constants.OPS_FINALIZED]
366
367     if not priorities:
368       # All opcodes are done, assume default priority
369       return constants.OP_PRIO_DEFAULT
370
371     return min(priorities)
372
373   def GetLogEntries(self, newer_than):
374     """Selectively returns the log entries.
375
376     @type newer_than: None or int
377     @param newer_than: if this is None, return all log entries,
378         otherwise return only the log entries with serial higher
379         than this value
380     @rtype: list
381     @return: the list of the log entries selected
382
383     """
384     if newer_than is None:
385       serial = -1
386     else:
387       serial = newer_than
388
389     entries = []
390     for op in self.ops:
391       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
392
393     return entries
394
395   def GetInfo(self, fields):
396     """Returns information about a job.
397
398     @type fields: list
399     @param fields: names of fields to return
400     @rtype: list
401     @return: list with one element for each field
402     @raise errors.OpExecError: when an invalid field
403         has been passed
404
405     """
406     return _SimpleJobQuery(fields)(self)
407
408   def MarkUnfinishedOps(self, status, result):
409     """Mark unfinished opcodes with a given status and result.
410
411     This is an utility function for marking all running or waiting to
412     be run opcodes with a given status. Opcodes which are already
413     finalised are not changed.
414
415     @param status: a given opcode status
416     @param result: the opcode result
417
418     """
419     not_marked = True
420     for op in self.ops:
421       if op.status in constants.OPS_FINALIZED:
422         assert not_marked, "Finalized opcodes found after non-finalized ones"
423         continue
424       op.status = status
425       op.result = result
426       not_marked = False
427
428   def Finalize(self):
429     """Marks the job as finalized.
430
431     """
432     self.end_timestamp = TimeStampNow()
433
434   def Cancel(self):
435     """Marks job as canceled/-ing if possible.
436
437     @rtype: tuple; (bool, string)
438     @return: Boolean describing whether job was successfully canceled or marked
439       as canceling and a text message
440
441     """
442     status = self.CalcStatus()
443
444     if status == constants.JOB_STATUS_QUEUED:
445       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
446                              "Job canceled by request")
447       self.Finalize()
448       return (True, "Job %s canceled" % self.id)
449
450     elif status == constants.JOB_STATUS_WAITING:
451       # The worker will notice the new status and cancel the job
452       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
453       return (True, "Job %s will be canceled" % self.id)
454
455     else:
456       logging.debug("Job %s is no longer waiting in the queue", self.id)
457       return (False, "Job %s is no longer waiting in the queue" % self.id)
458
459
460 class _OpExecCallbacks(mcpu.OpExecCbBase):
461   def __init__(self, queue, job, op):
462     """Initializes this class.
463
464     @type queue: L{JobQueue}
465     @param queue: Job queue
466     @type job: L{_QueuedJob}
467     @param job: Job object
468     @type op: L{_QueuedOpCode}
469     @param op: OpCode
470
471     """
472     assert queue, "Queue is missing"
473     assert job, "Job is missing"
474     assert op, "Opcode is missing"
475
476     self._queue = queue
477     self._job = job
478     self._op = op
479
480   def _CheckCancel(self):
481     """Raises an exception to cancel the job if asked to.
482
483     """
484     # Cancel here if we were asked to
485     if self._op.status == constants.OP_STATUS_CANCELING:
486       logging.debug("Canceling opcode")
487       raise CancelJob()
488
489   @locking.ssynchronized(_QUEUE, shared=1)
490   def NotifyStart(self):
491     """Mark the opcode as running, not lock-waiting.
492
493     This is called from the mcpu code as a notifier function, when the LU is
494     finally about to start the Exec() method. Of course, to have end-user
495     visible results, the opcode must be initially (before calling into
496     Processor.ExecOpCode) set to OP_STATUS_WAITING.
497
498     """
499     assert self._op in self._job.ops
500     assert self._op.status in (constants.OP_STATUS_WAITING,
501                                constants.OP_STATUS_CANCELING)
502
503     # Cancel here if we were asked to
504     self._CheckCancel()
505
506     logging.debug("Opcode is now running")
507
508     self._op.status = constants.OP_STATUS_RUNNING
509     self._op.exec_timestamp = TimeStampNow()
510
511     # And finally replicate the job status
512     self._queue.UpdateJobUnlocked(self._job)
513
514   @locking.ssynchronized(_QUEUE, shared=1)
515   def _AppendFeedback(self, timestamp, log_type, log_msg):
516     """Internal feedback append function, with locks
517
518     """
519     self._job.log_serial += 1
520     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
521     self._queue.UpdateJobUnlocked(self._job, replicate=False)
522
523   def Feedback(self, *args):
524     """Append a log entry.
525
526     """
527     assert len(args) < 3
528
529     if len(args) == 1:
530       log_type = constants.ELOG_MESSAGE
531       log_msg = args[0]
532     else:
533       (log_type, log_msg) = args
534
535     # The time is split to make serialization easier and not lose
536     # precision.
537     timestamp = utils.SplitTime(time.time())
538     self._AppendFeedback(timestamp, log_type, log_msg)
539
540   def CheckCancel(self):
541     """Check whether job has been cancelled.
542
543     """
544     assert self._op.status in (constants.OP_STATUS_WAITING,
545                                constants.OP_STATUS_CANCELING)
546
547     # Cancel here if we were asked to
548     self._CheckCancel()
549
550   def SubmitManyJobs(self, jobs):
551     """Submits jobs for processing.
552
553     See L{JobQueue.SubmitManyJobs}.
554
555     """
556     # Locking is done in job queue
557     return self._queue.SubmitManyJobs(jobs)
558
559
560 class _JobChangesChecker(object):
561   def __init__(self, fields, prev_job_info, prev_log_serial):
562     """Initializes this class.
563
564     @type fields: list of strings
565     @param fields: Fields requested by LUXI client
566     @type prev_job_info: string
567     @param prev_job_info: previous job info, as passed by the LUXI client
568     @type prev_log_serial: string
569     @param prev_log_serial: previous job serial, as passed by the LUXI client
570
571     """
572     self._squery = _SimpleJobQuery(fields)
573     self._prev_job_info = prev_job_info
574     self._prev_log_serial = prev_log_serial
575
576   def __call__(self, job):
577     """Checks whether job has changed.
578
579     @type job: L{_QueuedJob}
580     @param job: Job object
581
582     """
583     assert not job.writable, "Expected read-only job"
584
585     status = job.CalcStatus()
586     job_info = self._squery(job)
587     log_entries = job.GetLogEntries(self._prev_log_serial)
588
589     # Serializing and deserializing data can cause type changes (e.g. from
590     # tuple to list) or precision loss. We're doing it here so that we get
591     # the same modifications as the data received from the client. Without
592     # this, the comparison afterwards might fail without the data being
593     # significantly different.
594     # TODO: we just deserialized from disk, investigate how to make sure that
595     # the job info and log entries are compatible to avoid this further step.
596     # TODO: Doing something like in testutils.py:UnifyValueType might be more
597     # efficient, though floats will be tricky
598     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
599     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
600
601     # Don't even try to wait if the job is no longer running, there will be
602     # no changes.
603     if (status not in (constants.JOB_STATUS_QUEUED,
604                        constants.JOB_STATUS_RUNNING,
605                        constants.JOB_STATUS_WAITING) or
606         job_info != self._prev_job_info or
607         (log_entries and self._prev_log_serial != log_entries[0][0])):
608       logging.debug("Job %s changed", job.id)
609       return (job_info, log_entries)
610
611     return None
612
613
614 class _JobFileChangesWaiter(object):
615   def __init__(self, filename):
616     """Initializes this class.
617
618     @type filename: string
619     @param filename: Path to job file
620     @raises errors.InotifyError: if the notifier cannot be setup
621
622     """
623     self._wm = pyinotify.WatchManager()
624     self._inotify_handler = \
625       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
626     self._notifier = \
627       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
628     try:
629       self._inotify_handler.enable()
630     except Exception:
631       # pyinotify doesn't close file descriptors automatically
632       self._notifier.stop()
633       raise
634
635   def _OnInotify(self, notifier_enabled):
636     """Callback for inotify.
637
638     """
639     if not notifier_enabled:
640       self._inotify_handler.enable()
641
642   def Wait(self, timeout):
643     """Waits for the job file to change.
644
645     @type timeout: float
646     @param timeout: Timeout in seconds
647     @return: Whether there have been events
648
649     """
650     assert timeout >= 0
651     have_events = self._notifier.check_events(timeout * 1000)
652     if have_events:
653       self._notifier.read_events()
654     self._notifier.process_events()
655     return have_events
656
657   def Close(self):
658     """Closes underlying notifier and its file descriptor.
659
660     """
661     self._notifier.stop()
662
663
664 class _JobChangesWaiter(object):
665   def __init__(self, filename):
666     """Initializes this class.
667
668     @type filename: string
669     @param filename: Path to job file
670
671     """
672     self._filewaiter = None
673     self._filename = filename
674
675   def Wait(self, timeout):
676     """Waits for a job to change.
677
678     @type timeout: float
679     @param timeout: Timeout in seconds
680     @return: Whether there have been events
681
682     """
683     if self._filewaiter:
684       return self._filewaiter.Wait(timeout)
685
686     # Lazy setup: Avoid inotify setup cost when job file has already changed.
687     # If this point is reached, return immediately and let caller check the job
688     # file again in case there were changes since the last check. This avoids a
689     # race condition.
690     self._filewaiter = _JobFileChangesWaiter(self._filename)
691
692     return True
693
694   def Close(self):
695     """Closes underlying waiter.
696
697     """
698     if self._filewaiter:
699       self._filewaiter.Close()
700
701
702 class _WaitForJobChangesHelper(object):
703   """Helper class using inotify to wait for changes in a job file.
704
705   This class takes a previous job status and serial, and alerts the client when
706   the current job status has changed.
707
708   """
709   @staticmethod
710   def _CheckForChanges(counter, job_load_fn, check_fn):
711     if counter.next() > 0:
712       # If this isn't the first check the job is given some more time to change
713       # again. This gives better performance for jobs generating many
714       # changes/messages.
715       time.sleep(0.1)
716
717     job = job_load_fn()
718     if not job:
719       raise errors.JobLost()
720
721     result = check_fn(job)
722     if result is None:
723       raise utils.RetryAgain()
724
725     return result
726
727   def __call__(self, filename, job_load_fn,
728                fields, prev_job_info, prev_log_serial, timeout):
729     """Waits for changes on a job.
730
731     @type filename: string
732     @param filename: File on which to wait for changes
733     @type job_load_fn: callable
734     @param job_load_fn: Function to load job
735     @type fields: list of strings
736     @param fields: Which fields to check for changes
737     @type prev_job_info: list or None
738     @param prev_job_info: Last job information returned
739     @type prev_log_serial: int
740     @param prev_log_serial: Last job message serial number
741     @type timeout: float
742     @param timeout: maximum time to wait in seconds
743
744     """
745     counter = itertools.count()
746     try:
747       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
748       waiter = _JobChangesWaiter(filename)
749       try:
750         return utils.Retry(compat.partial(self._CheckForChanges,
751                                           counter, job_load_fn, check_fn),
752                            utils.RETRY_REMAINING_TIME, timeout,
753                            wait_fn=waiter.Wait)
754       finally:
755         waiter.Close()
756     except (errors.InotifyError, errors.JobLost):
757       return None
758     except utils.RetryTimeout:
759       return constants.JOB_NOTCHANGED
760
761
762 def _EncodeOpError(err):
763   """Encodes an error which occurred while processing an opcode.
764
765   """
766   if isinstance(err, errors.GenericError):
767     to_encode = err
768   else:
769     to_encode = errors.OpExecError(str(err))
770
771   return errors.EncodeException(to_encode)
772
773
774 class _TimeoutStrategyWrapper:
775   def __init__(self, fn):
776     """Initializes this class.
777
778     """
779     self._fn = fn
780     self._next = None
781
782   def _Advance(self):
783     """Gets the next timeout if necessary.
784
785     """
786     if self._next is None:
787       self._next = self._fn()
788
789   def Peek(self):
790     """Returns the next timeout.
791
792     """
793     self._Advance()
794     return self._next
795
796   def Next(self):
797     """Returns the current timeout and advances the internal state.
798
799     """
800     self._Advance()
801     result = self._next
802     self._next = None
803     return result
804
805
806 class _OpExecContext:
807   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
808     """Initializes this class.
809
810     """
811     self.op = op
812     self.index = index
813     self.log_prefix = log_prefix
814     self.summary = op.input.Summary()
815
816     # Create local copy to modify
817     if getattr(op.input, opcodes.DEPEND_ATTR, None):
818       self.jobdeps = op.input.depends[:]
819     else:
820       self.jobdeps = None
821
822     self._timeout_strategy_factory = timeout_strategy_factory
823     self._ResetTimeoutStrategy()
824
825   def _ResetTimeoutStrategy(self):
826     """Creates a new timeout strategy.
827
828     """
829     self._timeout_strategy = \
830       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
831
832   def CheckPriorityIncrease(self):
833     """Checks whether priority can and should be increased.
834
835     Called when locks couldn't be acquired.
836
837     """
838     op = self.op
839
840     # Exhausted all retries and next round should not use blocking acquire
841     # for locks?
842     if (self._timeout_strategy.Peek() is None and
843         op.priority > constants.OP_PRIO_HIGHEST):
844       logging.debug("Increasing priority")
845       op.priority -= 1
846       self._ResetTimeoutStrategy()
847       return True
848
849     return False
850
851   def GetNextLockTimeout(self):
852     """Returns the next lock acquire timeout.
853
854     """
855     return self._timeout_strategy.Next()
856
857
858 class _JobProcessor(object):
859   (DEFER,
860    WAITDEP,
861    FINISHED) = range(1, 4)
862
863   def __init__(self, queue, opexec_fn, job,
864                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
865     """Initializes this class.
866
867     """
868     self.queue = queue
869     self.opexec_fn = opexec_fn
870     self.job = job
871     self._timeout_strategy_factory = _timeout_strategy_factory
872
873   @staticmethod
874   def _FindNextOpcode(job, timeout_strategy_factory):
875     """Locates the next opcode to run.
876
877     @type job: L{_QueuedJob}
878     @param job: Job object
879     @param timeout_strategy_factory: Callable to create new timeout strategy
880
881     """
882     # Create some sort of a cache to speed up locating next opcode for future
883     # lookups
884     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
885     # pending and one for processed ops.
886     if job.ops_iter is None:
887       job.ops_iter = enumerate(job.ops)
888
889     # Find next opcode to run
890     while True:
891       try:
892         (idx, op) = job.ops_iter.next()
893       except StopIteration:
894         raise errors.ProgrammerError("Called for a finished job")
895
896       if op.status == constants.OP_STATUS_RUNNING:
897         # Found an opcode already marked as running
898         raise errors.ProgrammerError("Called for job marked as running")
899
900       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
901                              timeout_strategy_factory)
902
903       if op.status not in constants.OPS_FINALIZED:
904         return opctx
905
906       # This is a job that was partially completed before master daemon
907       # shutdown, so it can be expected that some opcodes are already
908       # completed successfully (if any did error out, then the whole job
909       # should have been aborted and not resubmitted for processing).
910       logging.info("%s: opcode %s already processed, skipping",
911                    opctx.log_prefix, opctx.summary)
912
913   @staticmethod
914   def _MarkWaitlock(job, op):
915     """Marks an opcode as waiting for locks.
916
917     The job's start timestamp is also set if necessary.
918
919     @type job: L{_QueuedJob}
920     @param job: Job object
921     @type op: L{_QueuedOpCode}
922     @param op: Opcode object
923
924     """
925     assert op in job.ops
926     assert op.status in (constants.OP_STATUS_QUEUED,
927                          constants.OP_STATUS_WAITING)
928
929     update = False
930
931     op.result = None
932
933     if op.status == constants.OP_STATUS_QUEUED:
934       op.status = constants.OP_STATUS_WAITING
935       update = True
936
937     if op.start_timestamp is None:
938       op.start_timestamp = TimeStampNow()
939       update = True
940
941     if job.start_timestamp is None:
942       job.start_timestamp = op.start_timestamp
943       update = True
944
945     assert op.status == constants.OP_STATUS_WAITING
946
947     return update
948
949   @staticmethod
950   def _CheckDependencies(queue, job, opctx):
951     """Checks if an opcode has dependencies and if so, processes them.
952
953     @type queue: L{JobQueue}
954     @param queue: Queue object
955     @type job: L{_QueuedJob}
956     @param job: Job object
957     @type opctx: L{_OpExecContext}
958     @param opctx: Opcode execution context
959     @rtype: bool
960     @return: Whether opcode will be re-scheduled by dependency tracker
961
962     """
963     op = opctx.op
964
965     result = False
966
967     while opctx.jobdeps:
968       (dep_job_id, dep_status) = opctx.jobdeps[0]
969
970       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
971                                                           dep_status)
972       assert ht.TNonEmptyString(depmsg), "No dependency message"
973
974       logging.info("%s: %s", opctx.log_prefix, depmsg)
975
976       if depresult == _JobDependencyManager.CONTINUE:
977         # Remove dependency and continue
978         opctx.jobdeps.pop(0)
979
980       elif depresult == _JobDependencyManager.WAIT:
981         # Need to wait for notification, dependency tracker will re-add job
982         # to workerpool
983         result = True
984         break
985
986       elif depresult == _JobDependencyManager.CANCEL:
987         # Job was cancelled, cancel this job as well
988         job.Cancel()
989         assert op.status == constants.OP_STATUS_CANCELING
990         break
991
992       elif depresult in (_JobDependencyManager.WRONGSTATUS,
993                          _JobDependencyManager.ERROR):
994         # Job failed or there was an error, this job must fail
995         op.status = constants.OP_STATUS_ERROR
996         op.result = _EncodeOpError(errors.OpExecError(depmsg))
997         break
998
999       else:
1000         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1001                                      depresult)
1002
1003     return result
1004
1005   def _ExecOpCodeUnlocked(self, opctx):
1006     """Processes one opcode and returns the result.
1007
1008     """
1009     op = opctx.op
1010
1011     assert op.status == constants.OP_STATUS_WAITING
1012
1013     timeout = opctx.GetNextLockTimeout()
1014
1015     try:
1016       # Make sure not to hold queue lock while calling ExecOpCode
1017       result = self.opexec_fn(op.input,
1018                               _OpExecCallbacks(self.queue, self.job, op),
1019                               timeout=timeout, priority=op.priority)
1020     except mcpu.LockAcquireTimeout:
1021       assert timeout is not None, "Received timeout for blocking acquire"
1022       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1023
1024       assert op.status in (constants.OP_STATUS_WAITING,
1025                            constants.OP_STATUS_CANCELING)
1026
1027       # Was job cancelled while we were waiting for the lock?
1028       if op.status == constants.OP_STATUS_CANCELING:
1029         return (constants.OP_STATUS_CANCELING, None)
1030
1031       # Stay in waitlock while trying to re-acquire lock
1032       return (constants.OP_STATUS_WAITING, None)
1033     except CancelJob:
1034       logging.exception("%s: Canceling job", opctx.log_prefix)
1035       assert op.status == constants.OP_STATUS_CANCELING
1036       return (constants.OP_STATUS_CANCELING, None)
1037     except Exception, err: # pylint: disable=W0703
1038       logging.exception("%s: Caught exception in %s",
1039                         opctx.log_prefix, opctx.summary)
1040       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1041     else:
1042       logging.debug("%s: %s successful",
1043                     opctx.log_prefix, opctx.summary)
1044       return (constants.OP_STATUS_SUCCESS, result)
1045
1046   def __call__(self, _nextop_fn=None):
1047     """Continues execution of a job.
1048
1049     @param _nextop_fn: Callback function for tests
1050     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1051       be deferred and C{WAITDEP} if the dependency manager
1052       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1053
1054     """
1055     queue = self.queue
1056     job = self.job
1057
1058     logging.debug("Processing job %s", job.id)
1059
1060     queue.acquire(shared=1)
1061     try:
1062       opcount = len(job.ops)
1063
1064       assert job.writable, "Expected writable job"
1065
1066       # Don't do anything for finalized jobs
1067       if job.CalcStatus() in constants.JOBS_FINALIZED:
1068         return self.FINISHED
1069
1070       # Is a previous opcode still pending?
1071       if job.cur_opctx:
1072         opctx = job.cur_opctx
1073         job.cur_opctx = None
1074       else:
1075         if __debug__ and _nextop_fn:
1076           _nextop_fn()
1077         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1078
1079       op = opctx.op
1080
1081       # Consistency check
1082       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1083                                      constants.OP_STATUS_CANCELING)
1084                         for i in job.ops[opctx.index + 1:])
1085
1086       assert op.status in (constants.OP_STATUS_QUEUED,
1087                            constants.OP_STATUS_WAITING,
1088                            constants.OP_STATUS_CANCELING)
1089
1090       assert (op.priority <= constants.OP_PRIO_LOWEST and
1091               op.priority >= constants.OP_PRIO_HIGHEST)
1092
1093       waitjob = None
1094
1095       if op.status != constants.OP_STATUS_CANCELING:
1096         assert op.status in (constants.OP_STATUS_QUEUED,
1097                              constants.OP_STATUS_WAITING)
1098
1099         # Prepare to start opcode
1100         if self._MarkWaitlock(job, op):
1101           # Write to disk
1102           queue.UpdateJobUnlocked(job)
1103
1104         assert op.status == constants.OP_STATUS_WAITING
1105         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1106         assert job.start_timestamp and op.start_timestamp
1107         assert waitjob is None
1108
1109         # Check if waiting for a job is necessary
1110         waitjob = self._CheckDependencies(queue, job, opctx)
1111
1112         assert op.status in (constants.OP_STATUS_WAITING,
1113                              constants.OP_STATUS_CANCELING,
1114                              constants.OP_STATUS_ERROR)
1115
1116         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1117                                          constants.OP_STATUS_ERROR)):
1118           logging.info("%s: opcode %s waiting for locks",
1119                        opctx.log_prefix, opctx.summary)
1120
1121           assert not opctx.jobdeps, "Not all dependencies were removed"
1122
1123           queue.release()
1124           try:
1125             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1126           finally:
1127             queue.acquire(shared=1)
1128
1129           op.status = op_status
1130           op.result = op_result
1131
1132           assert not waitjob
1133
1134         if op.status == constants.OP_STATUS_WAITING:
1135           # Couldn't get locks in time
1136           assert not op.end_timestamp
1137         else:
1138           # Finalize opcode
1139           op.end_timestamp = TimeStampNow()
1140
1141           if op.status == constants.OP_STATUS_CANCELING:
1142             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1143                                   for i in job.ops[opctx.index:])
1144           else:
1145             assert op.status in constants.OPS_FINALIZED
1146
1147       if op.status == constants.OP_STATUS_WAITING or waitjob:
1148         finalize = False
1149
1150         if not waitjob and opctx.CheckPriorityIncrease():
1151           # Priority was changed, need to update on-disk file
1152           queue.UpdateJobUnlocked(job)
1153
1154         # Keep around for another round
1155         job.cur_opctx = opctx
1156
1157         assert (op.priority <= constants.OP_PRIO_LOWEST and
1158                 op.priority >= constants.OP_PRIO_HIGHEST)
1159
1160         # In no case must the status be finalized here
1161         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1162
1163       else:
1164         # Ensure all opcodes so far have been successful
1165         assert (opctx.index == 0 or
1166                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1167                            for i in job.ops[:opctx.index]))
1168
1169         # Reset context
1170         job.cur_opctx = None
1171
1172         if op.status == constants.OP_STATUS_SUCCESS:
1173           finalize = False
1174
1175         elif op.status == constants.OP_STATUS_ERROR:
1176           # Ensure failed opcode has an exception as its result
1177           assert errors.GetEncodedError(job.ops[opctx.index].result)
1178
1179           to_encode = errors.OpExecError("Preceding opcode failed")
1180           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1181                                 _EncodeOpError(to_encode))
1182           finalize = True
1183
1184           # Consistency check
1185           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1186                             errors.GetEncodedError(i.result)
1187                             for i in job.ops[opctx.index:])
1188
1189         elif op.status == constants.OP_STATUS_CANCELING:
1190           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1191                                 "Job canceled by request")
1192           finalize = True
1193
1194         else:
1195           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1196
1197         if opctx.index == (opcount - 1):
1198           # Finalize on last opcode
1199           finalize = True
1200
1201         if finalize:
1202           # All opcodes have been run, finalize job
1203           job.Finalize()
1204
1205         # Write to disk. If the job status is final, this is the final write
1206         # allowed. Once the file has been written, it can be archived anytime.
1207         queue.UpdateJobUnlocked(job)
1208
1209         assert not waitjob
1210
1211         if finalize:
1212           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1213           return self.FINISHED
1214
1215       assert not waitjob or queue.depmgr.JobWaiting(job)
1216
1217       if waitjob:
1218         return self.WAITDEP
1219       else:
1220         return self.DEFER
1221     finally:
1222       assert job.writable, "Job became read-only while being processed"
1223       queue.release()
1224
1225
1226 def _EvaluateJobProcessorResult(depmgr, job, result):
1227   """Looks at a result from L{_JobProcessor} for a job.
1228
1229   To be used in a L{_JobQueueWorker}.
1230
1231   """
1232   if result == _JobProcessor.FINISHED:
1233     # Notify waiting jobs
1234     depmgr.NotifyWaiters(job.id)
1235
1236   elif result == _JobProcessor.DEFER:
1237     # Schedule again
1238     raise workerpool.DeferTask(priority=job.CalcPriority())
1239
1240   elif result == _JobProcessor.WAITDEP:
1241     # No-op, dependency manager will re-schedule
1242     pass
1243
1244   else:
1245     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1246                                  (result, ))
1247
1248
1249 class _JobQueueWorker(workerpool.BaseWorker):
1250   """The actual job workers.
1251
1252   """
1253   def RunTask(self, job): # pylint: disable=W0221
1254     """Job executor.
1255
1256     @type job: L{_QueuedJob}
1257     @param job: the job to be processed
1258
1259     """
1260     assert job.writable, "Expected writable job"
1261
1262     # Ensure only one worker is active on a single job. If a job registers for
1263     # a dependency job, and the other job notifies before the first worker is
1264     # done, the job can end up in the tasklist more than once.
1265     job.processor_lock.acquire()
1266     try:
1267       return self._RunTaskInner(job)
1268     finally:
1269       job.processor_lock.release()
1270
1271   def _RunTaskInner(self, job):
1272     """Executes a job.
1273
1274     Must be called with per-job lock acquired.
1275
1276     """
1277     queue = job.queue
1278     assert queue == self.pool.queue
1279
1280     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1281     setname_fn(None)
1282
1283     proc = mcpu.Processor(queue.context, job.id)
1284
1285     # Create wrapper for setting thread name
1286     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1287                                     proc.ExecOpCode)
1288
1289     _EvaluateJobProcessorResult(queue.depmgr, job,
1290                                 _JobProcessor(queue, wrap_execop_fn, job)())
1291
1292   @staticmethod
1293   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1294     """Updates the worker thread name to include a short summary of the opcode.
1295
1296     @param setname_fn: Callable setting worker thread name
1297     @param execop_fn: Callable for executing opcode (usually
1298                       L{mcpu.Processor.ExecOpCode})
1299
1300     """
1301     setname_fn(op)
1302     try:
1303       return execop_fn(op, *args, **kwargs)
1304     finally:
1305       setname_fn(None)
1306
1307   @staticmethod
1308   def _GetWorkerName(job, op):
1309     """Sets the worker thread name.
1310
1311     @type job: L{_QueuedJob}
1312     @type op: L{opcodes.OpCode}
1313
1314     """
1315     parts = ["Job%s" % job.id]
1316
1317     if op:
1318       parts.append(op.TinySummary())
1319
1320     return "/".join(parts)
1321
1322
1323 class _JobQueueWorkerPool(workerpool.WorkerPool):
1324   """Simple class implementing a job-processing workerpool.
1325
1326   """
1327   def __init__(self, queue):
1328     super(_JobQueueWorkerPool, self).__init__("Jq",
1329                                               JOBQUEUE_THREADS,
1330                                               _JobQueueWorker)
1331     self.queue = queue
1332
1333
1334 class _JobDependencyManager:
1335   """Keeps track of job dependencies.
1336
1337   """
1338   (WAIT,
1339    ERROR,
1340    CANCEL,
1341    CONTINUE,
1342    WRONGSTATUS) = range(1, 6)
1343
1344   def __init__(self, getstatus_fn, enqueue_fn):
1345     """Initializes this class.
1346
1347     """
1348     self._getstatus_fn = getstatus_fn
1349     self._enqueue_fn = enqueue_fn
1350
1351     self._waiters = {}
1352     self._lock = locking.SharedLock("JobDepMgr")
1353
1354   @locking.ssynchronized(_LOCK, shared=1)
1355   def GetLockInfo(self, requested): # pylint: disable=W0613
1356     """Retrieves information about waiting jobs.
1357
1358     @type requested: set
1359     @param requested: Requested information, see C{query.LQ_*}
1360
1361     """
1362     # No need to sort here, that's being done by the lock manager and query
1363     # library. There are no priorities for notifying jobs, hence all show up as
1364     # one item under "pending".
1365     return [("job/%s" % job_id, None, None,
1366              [("job", [job.id for job in waiters])])
1367             for job_id, waiters in self._waiters.items()
1368             if waiters]
1369
1370   @locking.ssynchronized(_LOCK, shared=1)
1371   def JobWaiting(self, job):
1372     """Checks if a job is waiting.
1373
1374     """
1375     return compat.any(job in jobs
1376                       for jobs in self._waiters.values())
1377
1378   @locking.ssynchronized(_LOCK)
1379   def CheckAndRegister(self, job, dep_job_id, dep_status):
1380     """Checks if a dependency job has the requested status.
1381
1382     If the other job is not yet in a finalized status, the calling job will be
1383     notified (re-added to the workerpool) at a later point.
1384
1385     @type job: L{_QueuedJob}
1386     @param job: Job object
1387     @type dep_job_id: int
1388     @param dep_job_id: ID of dependency job
1389     @type dep_status: list
1390     @param dep_status: Required status
1391
1392     """
1393     assert ht.TJobId(job.id)
1394     assert ht.TJobId(dep_job_id)
1395     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1396
1397     if job.id == dep_job_id:
1398       return (self.ERROR, "Job can't depend on itself")
1399
1400     # Get status of dependency job
1401     try:
1402       status = self._getstatus_fn(dep_job_id)
1403     except errors.JobLost, err:
1404       return (self.ERROR, "Dependency error: %s" % err)
1405
1406     assert status in constants.JOB_STATUS_ALL
1407
1408     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1409
1410     if status not in constants.JOBS_FINALIZED:
1411       # Register for notification and wait for job to finish
1412       job_id_waiters.add(job)
1413       return (self.WAIT,
1414               "Need to wait for job %s, wanted status '%s'" %
1415               (dep_job_id, dep_status))
1416
1417     # Remove from waiters list
1418     if job in job_id_waiters:
1419       job_id_waiters.remove(job)
1420
1421     if (status == constants.JOB_STATUS_CANCELED and
1422         constants.JOB_STATUS_CANCELED not in dep_status):
1423       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1424
1425     elif not dep_status or status in dep_status:
1426       return (self.CONTINUE,
1427               "Dependency job %s finished with status '%s'" %
1428               (dep_job_id, status))
1429
1430     else:
1431       return (self.WRONGSTATUS,
1432               "Dependency job %s finished with status '%s',"
1433               " not one of '%s' as required" %
1434               (dep_job_id, status, utils.CommaJoin(dep_status)))
1435
1436   def _RemoveEmptyWaitersUnlocked(self):
1437     """Remove all jobs without actual waiters.
1438
1439     """
1440     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1441                    if not waiters]:
1442       del self._waiters[job_id]
1443
1444   def NotifyWaiters(self, job_id):
1445     """Notifies all jobs waiting for a certain job ID.
1446
1447     @attention: Do not call until L{CheckAndRegister} returned a status other
1448       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1449     @type job_id: int
1450     @param job_id: Job ID
1451
1452     """
1453     assert ht.TJobId(job_id)
1454
1455     self._lock.acquire()
1456     try:
1457       self._RemoveEmptyWaitersUnlocked()
1458
1459       jobs = self._waiters.pop(job_id, None)
1460     finally:
1461       self._lock.release()
1462
1463     if jobs:
1464       # Re-add jobs to workerpool
1465       logging.debug("Re-adding %s jobs which were waiting for job %s",
1466                     len(jobs), job_id)
1467       self._enqueue_fn(jobs)
1468
1469
1470 def _RequireOpenQueue(fn):
1471   """Decorator for "public" functions.
1472
1473   This function should be used for all 'public' functions. That is,
1474   functions usually called from other classes. Note that this should
1475   be applied only to methods (not plain functions), since it expects
1476   that the decorated function is called with a first argument that has
1477   a '_queue_filelock' argument.
1478
1479   @warning: Use this decorator only after locking.ssynchronized
1480
1481   Example::
1482     @locking.ssynchronized(_LOCK)
1483     @_RequireOpenQueue
1484     def Example(self):
1485       pass
1486
1487   """
1488   def wrapper(self, *args, **kwargs):
1489     # pylint: disable=W0212
1490     assert self._queue_filelock is not None, "Queue should be open"
1491     return fn(self, *args, **kwargs)
1492   return wrapper
1493
1494
1495 def _RequireNonDrainedQueue(fn):
1496   """Decorator checking for a non-drained queue.
1497
1498   To be used with functions submitting new jobs.
1499
1500   """
1501   def wrapper(self, *args, **kwargs):
1502     """Wrapper function.
1503
1504     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1505
1506     """
1507     # Ok when sharing the big job queue lock, as the drain file is created when
1508     # the lock is exclusive.
1509     # Needs access to protected member, pylint: disable=W0212
1510     if self._drained:
1511       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1512
1513     if not self._accepting_jobs:
1514       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1515
1516     return fn(self, *args, **kwargs)
1517   return wrapper
1518
1519
1520 class JobQueue(object):
1521   """Queue used to manage the jobs.
1522
1523   """
1524   def __init__(self, context):
1525     """Constructor for JobQueue.
1526
1527     The constructor will initialize the job queue object and then
1528     start loading the current jobs from disk, either for starting them
1529     (if they were queue) or for aborting them (if they were already
1530     running).
1531
1532     @type context: GanetiContext
1533     @param context: the context object for access to the configuration
1534         data and other ganeti objects
1535
1536     """
1537     self.context = context
1538     self._memcache = weakref.WeakValueDictionary()
1539     self._my_hostname = netutils.Hostname.GetSysName()
1540
1541     # The Big JobQueue lock. If a code block or method acquires it in shared
1542     # mode safe it must guarantee concurrency with all the code acquiring it in
1543     # shared mode, including itself. In order not to acquire it at all
1544     # concurrency must be guaranteed with all code acquiring it in shared mode
1545     # and all code acquiring it exclusively.
1546     self._lock = locking.SharedLock("JobQueue")
1547
1548     self.acquire = self._lock.acquire
1549     self.release = self._lock.release
1550
1551     # Accept jobs by default
1552     self._accepting_jobs = True
1553
1554     # Initialize the queue, and acquire the filelock.
1555     # This ensures no other process is working on the job queue.
1556     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1557
1558     # Read serial file
1559     self._last_serial = jstore.ReadSerial()
1560     assert self._last_serial is not None, ("Serial file was modified between"
1561                                            " check in jstore and here")
1562
1563     # Get initial list of nodes
1564     self._nodes = dict((n.name, n.primary_ip)
1565                        for n in self.context.cfg.GetAllNodesInfo().values()
1566                        if n.master_candidate)
1567
1568     # Remove master node
1569     self._nodes.pop(self._my_hostname, None)
1570
1571     # TODO: Check consistency across nodes
1572
1573     self._queue_size = None
1574     self._UpdateQueueSizeUnlocked()
1575     assert ht.TInt(self._queue_size)
1576     self._drained = jstore.CheckDrainFlag()
1577
1578     # Job dependencies
1579     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1580                                         self._EnqueueJobs)
1581     self.context.glm.AddToLockMonitor(self.depmgr)
1582
1583     # Setup worker pool
1584     self._wpool = _JobQueueWorkerPool(self)
1585     try:
1586       self._InspectQueue()
1587     except:
1588       self._wpool.TerminateWorkers()
1589       raise
1590
1591   @locking.ssynchronized(_LOCK)
1592   @_RequireOpenQueue
1593   def _InspectQueue(self):
1594     """Loads the whole job queue and resumes unfinished jobs.
1595
1596     This function needs the lock here because WorkerPool.AddTask() may start a
1597     job while we're still doing our work.
1598
1599     """
1600     logging.info("Inspecting job queue")
1601
1602     restartjobs = []
1603
1604     all_job_ids = self._GetJobIDsUnlocked()
1605     jobs_count = len(all_job_ids)
1606     lastinfo = time.time()
1607     for idx, job_id in enumerate(all_job_ids):
1608       # Give an update every 1000 jobs or 10 seconds
1609       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1610           idx == (jobs_count - 1)):
1611         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1612                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1613         lastinfo = time.time()
1614
1615       job = self._LoadJobUnlocked(job_id)
1616
1617       # a failure in loading the job can cause 'None' to be returned
1618       if job is None:
1619         continue
1620
1621       status = job.CalcStatus()
1622
1623       if status == constants.JOB_STATUS_QUEUED:
1624         restartjobs.append(job)
1625
1626       elif status in (constants.JOB_STATUS_RUNNING,
1627                       constants.JOB_STATUS_WAITING,
1628                       constants.JOB_STATUS_CANCELING):
1629         logging.warning("Unfinished job %s found: %s", job.id, job)
1630
1631         if status == constants.JOB_STATUS_WAITING:
1632           # Restart job
1633           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1634           restartjobs.append(job)
1635         else:
1636           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1637                                 "Unclean master daemon shutdown")
1638           job.Finalize()
1639
1640         self.UpdateJobUnlocked(job)
1641
1642     if restartjobs:
1643       logging.info("Restarting %s jobs", len(restartjobs))
1644       self._EnqueueJobsUnlocked(restartjobs)
1645
1646     logging.info("Job queue inspection finished")
1647
1648   def _GetRpc(self, address_list):
1649     """Gets RPC runner with context.
1650
1651     """
1652     return rpc.JobQueueRunner(self.context, address_list)
1653
1654   @locking.ssynchronized(_LOCK)
1655   @_RequireOpenQueue
1656   def AddNode(self, node):
1657     """Register a new node with the queue.
1658
1659     @type node: L{objects.Node}
1660     @param node: the node object to be added
1661
1662     """
1663     node_name = node.name
1664     assert node_name != self._my_hostname
1665
1666     # Clean queue directory on added node
1667     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1668     msg = result.fail_msg
1669     if msg:
1670       logging.warning("Cannot cleanup queue directory on node %s: %s",
1671                       node_name, msg)
1672
1673     if not node.master_candidate:
1674       # remove if existing, ignoring errors
1675       self._nodes.pop(node_name, None)
1676       # and skip the replication of the job ids
1677       return
1678
1679     # Upload the whole queue excluding archived jobs
1680     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1681
1682     # Upload current serial file
1683     files.append(constants.JOB_QUEUE_SERIAL_FILE)
1684
1685     # Static address list
1686     addrs = [node.primary_ip]
1687
1688     for file_name in files:
1689       # Read file content
1690       content = utils.ReadFile(file_name)
1691
1692       result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1693                                                         content)
1694       msg = result[node_name].fail_msg
1695       if msg:
1696         logging.error("Failed to upload file %s to node %s: %s",
1697                       file_name, node_name, msg)
1698
1699     self._nodes[node_name] = node.primary_ip
1700
1701   @locking.ssynchronized(_LOCK)
1702   @_RequireOpenQueue
1703   def RemoveNode(self, node_name):
1704     """Callback called when removing nodes from the cluster.
1705
1706     @type node_name: str
1707     @param node_name: the name of the node to remove
1708
1709     """
1710     self._nodes.pop(node_name, None)
1711
1712   @staticmethod
1713   def _CheckRpcResult(result, nodes, failmsg):
1714     """Verifies the status of an RPC call.
1715
1716     Since we aim to keep consistency should this node (the current
1717     master) fail, we will log errors if our rpc fail, and especially
1718     log the case when more than half of the nodes fails.
1719
1720     @param result: the data as returned from the rpc call
1721     @type nodes: list
1722     @param nodes: the list of nodes we made the call to
1723     @type failmsg: str
1724     @param failmsg: the identifier to be used for logging
1725
1726     """
1727     failed = []
1728     success = []
1729
1730     for node in nodes:
1731       msg = result[node].fail_msg
1732       if msg:
1733         failed.append(node)
1734         logging.error("RPC call %s (%s) failed on node %s: %s",
1735                       result[node].call, failmsg, node, msg)
1736       else:
1737         success.append(node)
1738
1739     # +1 for the master node
1740     if (len(success) + 1) < len(failed):
1741       # TODO: Handle failing nodes
1742       logging.error("More than half of the nodes failed")
1743
1744   def _GetNodeIp(self):
1745     """Helper for returning the node name/ip list.
1746
1747     @rtype: (list, list)
1748     @return: a tuple of two lists, the first one with the node
1749         names and the second one with the node addresses
1750
1751     """
1752     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1753     name_list = self._nodes.keys()
1754     addr_list = [self._nodes[name] for name in name_list]
1755     return name_list, addr_list
1756
1757   def _UpdateJobQueueFile(self, file_name, data, replicate):
1758     """Writes a file locally and then replicates it to all nodes.
1759
1760     This function will replace the contents of a file on the local
1761     node and then replicate it to all the other nodes we have.
1762
1763     @type file_name: str
1764     @param file_name: the path of the file to be replicated
1765     @type data: str
1766     @param data: the new contents of the file
1767     @type replicate: boolean
1768     @param replicate: whether to spread the changes to the remote nodes
1769
1770     """
1771     getents = runtime.GetEnts()
1772     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1773                     gid=getents.masterd_gid)
1774
1775     if replicate:
1776       names, addrs = self._GetNodeIp()
1777       result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1778       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1779
1780   def _RenameFilesUnlocked(self, rename):
1781     """Renames a file locally and then replicate the change.
1782
1783     This function will rename a file in the local queue directory
1784     and then replicate this rename to all the other nodes we have.
1785
1786     @type rename: list of (old, new)
1787     @param rename: List containing tuples mapping old to new names
1788
1789     """
1790     # Rename them locally
1791     for old, new in rename:
1792       utils.RenameFile(old, new, mkdir=True)
1793
1794     # ... and on all nodes
1795     names, addrs = self._GetNodeIp()
1796     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1797     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1798
1799   def _NewSerialsUnlocked(self, count):
1800     """Generates a new job identifier.
1801
1802     Job identifiers are unique during the lifetime of a cluster.
1803
1804     @type count: integer
1805     @param count: how many serials to return
1806     @rtype: list of int
1807     @return: a list of job identifiers.
1808
1809     """
1810     assert ht.TPositiveInt(count)
1811
1812     # New number
1813     serial = self._last_serial + count
1814
1815     # Write to file
1816     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1817                              "%s\n" % serial, True)
1818
1819     result = [jstore.FormatJobID(v)
1820               for v in range(self._last_serial + 1, serial + 1)]
1821
1822     # Keep it only if we were able to write the file
1823     self._last_serial = serial
1824
1825     assert len(result) == count
1826
1827     return result
1828
1829   @staticmethod
1830   def _GetJobPath(job_id):
1831     """Returns the job file for a given job id.
1832
1833     @type job_id: str
1834     @param job_id: the job identifier
1835     @rtype: str
1836     @return: the path to the job file
1837
1838     """
1839     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1840
1841   @staticmethod
1842   def _GetArchivedJobPath(job_id):
1843     """Returns the archived job file for a give job id.
1844
1845     @type job_id: str
1846     @param job_id: the job identifier
1847     @rtype: str
1848     @return: the path to the archived job file
1849
1850     """
1851     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1852                           jstore.GetArchiveDirectory(job_id),
1853                           "job-%s" % job_id)
1854
1855   @staticmethod
1856   def _GetJobIDsUnlocked(sort=True):
1857     """Return all known job IDs.
1858
1859     The method only looks at disk because it's a requirement that all
1860     jobs are present on disk (so in the _memcache we don't have any
1861     extra IDs).
1862
1863     @type sort: boolean
1864     @param sort: perform sorting on the returned job ids
1865     @rtype: list
1866     @return: the list of job IDs
1867
1868     """
1869     jlist = []
1870     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1871       m = constants.JOB_FILE_RE.match(filename)
1872       if m:
1873         jlist.append(int(m.group(1)))
1874     if sort:
1875       jlist.sort()
1876     return jlist
1877
1878   def _LoadJobUnlocked(self, job_id):
1879     """Loads a job from the disk or memory.
1880
1881     Given a job id, this will return the cached job object if
1882     existing, or try to load the job from the disk. If loading from
1883     disk, it will also add the job to the cache.
1884
1885     @type job_id: int
1886     @param job_id: the job id
1887     @rtype: L{_QueuedJob} or None
1888     @return: either None or the job object
1889
1890     """
1891     job = self._memcache.get(job_id, None)
1892     if job:
1893       logging.debug("Found job %s in memcache", job_id)
1894       assert job.writable, "Found read-only job in memcache"
1895       return job
1896
1897     try:
1898       job = self._LoadJobFromDisk(job_id, False)
1899       if job is None:
1900         return job
1901     except errors.JobFileCorrupted:
1902       old_path = self._GetJobPath(job_id)
1903       new_path = self._GetArchivedJobPath(job_id)
1904       if old_path == new_path:
1905         # job already archived (future case)
1906         logging.exception("Can't parse job %s", job_id)
1907       else:
1908         # non-archived case
1909         logging.exception("Can't parse job %s, will archive.", job_id)
1910         self._RenameFilesUnlocked([(old_path, new_path)])
1911       return None
1912
1913     assert job.writable, "Job just loaded is not writable"
1914
1915     self._memcache[job_id] = job
1916     logging.debug("Added job %s to the cache", job_id)
1917     return job
1918
1919   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1920     """Load the given job file from disk.
1921
1922     Given a job file, read, load and restore it in a _QueuedJob format.
1923
1924     @type job_id: int
1925     @param job_id: job identifier
1926     @type try_archived: bool
1927     @param try_archived: Whether to try loading an archived job
1928     @rtype: L{_QueuedJob} or None
1929     @return: either None or the job object
1930
1931     """
1932     path_functions = [(self._GetJobPath, True)]
1933
1934     if try_archived:
1935       path_functions.append((self._GetArchivedJobPath, False))
1936
1937     raw_data = None
1938     writable_default = None
1939
1940     for (fn, writable_default) in path_functions:
1941       filepath = fn(job_id)
1942       logging.debug("Loading job from %s", filepath)
1943       try:
1944         raw_data = utils.ReadFile(filepath)
1945       except EnvironmentError, err:
1946         if err.errno != errno.ENOENT:
1947           raise
1948       else:
1949         break
1950
1951     if not raw_data:
1952       return None
1953
1954     if writable is None:
1955       writable = writable_default
1956
1957     try:
1958       data = serializer.LoadJson(raw_data)
1959       job = _QueuedJob.Restore(self, data, writable)
1960     except Exception, err: # pylint: disable=W0703
1961       raise errors.JobFileCorrupted(err)
1962
1963     return job
1964
1965   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1966     """Load the given job file from disk.
1967
1968     Given a job file, read, load and restore it in a _QueuedJob format.
1969     In case of error reading the job, it gets returned as None, and the
1970     exception is logged.
1971
1972     @type job_id: int
1973     @param job_id: job identifier
1974     @type try_archived: bool
1975     @param try_archived: Whether to try loading an archived job
1976     @rtype: L{_QueuedJob} or None
1977     @return: either None or the job object
1978
1979     """
1980     try:
1981       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1982     except (errors.JobFileCorrupted, EnvironmentError):
1983       logging.exception("Can't load/parse job %s", job_id)
1984       return None
1985
1986   def _UpdateQueueSizeUnlocked(self):
1987     """Update the queue size.
1988
1989     """
1990     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1991
1992   @locking.ssynchronized(_LOCK)
1993   @_RequireOpenQueue
1994   def SetDrainFlag(self, drain_flag):
1995     """Sets the drain flag for the queue.
1996
1997     @type drain_flag: boolean
1998     @param drain_flag: Whether to set or unset the drain flag
1999
2000     """
2001     jstore.SetDrainFlag(drain_flag)
2002
2003     self._drained = drain_flag
2004
2005     return True
2006
2007   @_RequireOpenQueue
2008   def _SubmitJobUnlocked(self, job_id, ops):
2009     """Create and store a new job.
2010
2011     This enters the job into our job queue and also puts it on the new
2012     queue, in order for it to be picked up by the queue processors.
2013
2014     @type job_id: job ID
2015     @param job_id: the job ID for the new job
2016     @type ops: list
2017     @param ops: The list of OpCodes that will become the new job.
2018     @rtype: L{_QueuedJob}
2019     @return: the job object to be queued
2020     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2021     @raise errors.GenericError: If an opcode is not valid
2022
2023     """
2024     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2025       raise errors.JobQueueFull()
2026
2027     job = _QueuedJob(self, job_id, ops, True)
2028
2029     # Check priority
2030     for idx, op in enumerate(job.ops):
2031       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2032         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2033         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2034                                   " are %s" % (idx, op.priority, allowed))
2035
2036       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2037       if not opcodes.TNoRelativeJobDependencies(dependencies):
2038         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2039                                   " match %s: %s" %
2040                                   (idx, opcodes.TNoRelativeJobDependencies,
2041                                    dependencies))
2042
2043     # Write to disk
2044     self.UpdateJobUnlocked(job)
2045
2046     self._queue_size += 1
2047
2048     logging.debug("Adding new job %s to the cache", job_id)
2049     self._memcache[job_id] = job
2050
2051     return job
2052
2053   @locking.ssynchronized(_LOCK)
2054   @_RequireOpenQueue
2055   @_RequireNonDrainedQueue
2056   def SubmitJob(self, ops):
2057     """Create and store a new job.
2058
2059     @see: L{_SubmitJobUnlocked}
2060
2061     """
2062     (job_id, ) = self._NewSerialsUnlocked(1)
2063     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2064     return job_id
2065
2066   @locking.ssynchronized(_LOCK)
2067   @_RequireOpenQueue
2068   @_RequireNonDrainedQueue
2069   def SubmitManyJobs(self, jobs):
2070     """Create and store multiple jobs.
2071
2072     @see: L{_SubmitJobUnlocked}
2073
2074     """
2075     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2076
2077     (results, added_jobs) = \
2078       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2079
2080     self._EnqueueJobsUnlocked(added_jobs)
2081
2082     return results
2083
2084   @staticmethod
2085   def _FormatSubmitError(msg, ops):
2086     """Formats errors which occurred while submitting a job.
2087
2088     """
2089     return ("%s; opcodes %s" %
2090             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2091
2092   @staticmethod
2093   def _ResolveJobDependencies(resolve_fn, deps):
2094     """Resolves relative job IDs in dependencies.
2095
2096     @type resolve_fn: callable
2097     @param resolve_fn: Function to resolve a relative job ID
2098     @type deps: list
2099     @param deps: Dependencies
2100     @rtype: list
2101     @return: Resolved dependencies
2102
2103     """
2104     result = []
2105
2106     for (dep_job_id, dep_status) in deps:
2107       if ht.TRelativeJobId(dep_job_id):
2108         assert ht.TInt(dep_job_id) and dep_job_id < 0
2109         try:
2110           job_id = resolve_fn(dep_job_id)
2111         except IndexError:
2112           # Abort
2113           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2114       else:
2115         job_id = dep_job_id
2116
2117       result.append((job_id, dep_status))
2118
2119     return (True, result)
2120
2121   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2122     """Create and store multiple jobs.
2123
2124     @see: L{_SubmitJobUnlocked}
2125
2126     """
2127     results = []
2128     added_jobs = []
2129
2130     def resolve_fn(job_idx, reljobid):
2131       assert reljobid < 0
2132       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2133
2134     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2135       for op in ops:
2136         if getattr(op, opcodes.DEPEND_ATTR, None):
2137           (status, data) = \
2138             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2139                                          op.depends)
2140           if not status:
2141             # Abort resolving dependencies
2142             assert ht.TNonEmptyString(data), "No error message"
2143             break
2144           # Use resolved dependencies
2145           op.depends = data
2146       else:
2147         try:
2148           job = self._SubmitJobUnlocked(job_id, ops)
2149         except errors.GenericError, err:
2150           status = False
2151           data = self._FormatSubmitError(str(err), ops)
2152         else:
2153           status = True
2154           data = job_id
2155           added_jobs.append(job)
2156
2157       results.append((status, data))
2158
2159     return (results, added_jobs)
2160
2161   @locking.ssynchronized(_LOCK)
2162   def _EnqueueJobs(self, jobs):
2163     """Helper function to add jobs to worker pool's queue.
2164
2165     @type jobs: list
2166     @param jobs: List of all jobs
2167
2168     """
2169     return self._EnqueueJobsUnlocked(jobs)
2170
2171   def _EnqueueJobsUnlocked(self, jobs):
2172     """Helper function to add jobs to worker pool's queue.
2173
2174     @type jobs: list
2175     @param jobs: List of all jobs
2176
2177     """
2178     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2179     self._wpool.AddManyTasks([(job, ) for job in jobs],
2180                              priority=[job.CalcPriority() for job in jobs])
2181
2182   def _GetJobStatusForDependencies(self, job_id):
2183     """Gets the status of a job for dependencies.
2184
2185     @type job_id: int
2186     @param job_id: Job ID
2187     @raise errors.JobLost: If job can't be found
2188
2189     """
2190     # Not using in-memory cache as doing so would require an exclusive lock
2191
2192     # Try to load from disk
2193     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2194
2195     assert not job.writable, "Got writable job" # pylint: disable=E1101
2196
2197     if job:
2198       return job.CalcStatus()
2199
2200     raise errors.JobLost("Job %s not found" % job_id)
2201
2202   @_RequireOpenQueue
2203   def UpdateJobUnlocked(self, job, replicate=True):
2204     """Update a job's on disk storage.
2205
2206     After a job has been modified, this function needs to be called in
2207     order to write the changes to disk and replicate them to the other
2208     nodes.
2209
2210     @type job: L{_QueuedJob}
2211     @param job: the changed job
2212     @type replicate: boolean
2213     @param replicate: whether to replicate the change to remote nodes
2214
2215     """
2216     if __debug__:
2217       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2218       assert (finalized ^ (job.end_timestamp is None))
2219       assert job.writable, "Can't update read-only job"
2220
2221     filename = self._GetJobPath(job.id)
2222     data = serializer.DumpJson(job.Serialize())
2223     logging.debug("Writing job %s to %s", job.id, filename)
2224     self._UpdateJobQueueFile(filename, data, replicate)
2225
2226   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2227                         timeout):
2228     """Waits for changes in a job.
2229
2230     @type job_id: int
2231     @param job_id: Job identifier
2232     @type fields: list of strings
2233     @param fields: Which fields to check for changes
2234     @type prev_job_info: list or None
2235     @param prev_job_info: Last job information returned
2236     @type prev_log_serial: int
2237     @param prev_log_serial: Last job message serial number
2238     @type timeout: float
2239     @param timeout: maximum time to wait in seconds
2240     @rtype: tuple (job info, log entries)
2241     @return: a tuple of the job information as required via
2242         the fields parameter, and the log entries as a list
2243
2244         if the job has not changed and the timeout has expired,
2245         we instead return a special value,
2246         L{constants.JOB_NOTCHANGED}, which should be interpreted
2247         as such by the clients
2248
2249     """
2250     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2251                              writable=False)
2252
2253     helper = _WaitForJobChangesHelper()
2254
2255     return helper(self._GetJobPath(job_id), load_fn,
2256                   fields, prev_job_info, prev_log_serial, timeout)
2257
2258   @locking.ssynchronized(_LOCK)
2259   @_RequireOpenQueue
2260   def CancelJob(self, job_id):
2261     """Cancels a job.
2262
2263     This will only succeed if the job has not started yet.
2264
2265     @type job_id: int
2266     @param job_id: job ID of job to be cancelled.
2267
2268     """
2269     logging.info("Cancelling job %s", job_id)
2270
2271     job = self._LoadJobUnlocked(job_id)
2272     if not job:
2273       logging.debug("Job %s not found", job_id)
2274       return (False, "Job %s not found" % job_id)
2275
2276     assert job.writable, "Can't cancel read-only job"
2277
2278     (success, msg) = job.Cancel()
2279
2280     if success:
2281       # If the job was finalized (e.g. cancelled), this is the final write
2282       # allowed. The job can be archived anytime.
2283       self.UpdateJobUnlocked(job)
2284
2285     return (success, msg)
2286
2287   @_RequireOpenQueue
2288   def _ArchiveJobsUnlocked(self, jobs):
2289     """Archives jobs.
2290
2291     @type jobs: list of L{_QueuedJob}
2292     @param jobs: Job objects
2293     @rtype: int
2294     @return: Number of archived jobs
2295
2296     """
2297     archive_jobs = []
2298     rename_files = []
2299     for job in jobs:
2300       assert job.writable, "Can't archive read-only job"
2301
2302       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2303         logging.debug("Job %s is not yet done", job.id)
2304         continue
2305
2306       archive_jobs.append(job)
2307
2308       old = self._GetJobPath(job.id)
2309       new = self._GetArchivedJobPath(job.id)
2310       rename_files.append((old, new))
2311
2312     # TODO: What if 1..n files fail to rename?
2313     self._RenameFilesUnlocked(rename_files)
2314
2315     logging.debug("Successfully archived job(s) %s",
2316                   utils.CommaJoin(job.id for job in archive_jobs))
2317
2318     # Since we haven't quite checked, above, if we succeeded or failed renaming
2319     # the files, we update the cached queue size from the filesystem. When we
2320     # get around to fix the TODO: above, we can use the number of actually
2321     # archived jobs to fix this.
2322     self._UpdateQueueSizeUnlocked()
2323     return len(archive_jobs)
2324
2325   @locking.ssynchronized(_LOCK)
2326   @_RequireOpenQueue
2327   def ArchiveJob(self, job_id):
2328     """Archives a job.
2329
2330     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2331
2332     @type job_id: int
2333     @param job_id: Job ID of job to be archived.
2334     @rtype: bool
2335     @return: Whether job was archived
2336
2337     """
2338     logging.info("Archiving job %s", job_id)
2339
2340     job = self._LoadJobUnlocked(job_id)
2341     if not job:
2342       logging.debug("Job %s not found", job_id)
2343       return False
2344
2345     return self._ArchiveJobsUnlocked([job]) == 1
2346
2347   @locking.ssynchronized(_LOCK)
2348   @_RequireOpenQueue
2349   def AutoArchiveJobs(self, age, timeout):
2350     """Archives all jobs based on age.
2351
2352     The method will archive all jobs which are older than the age
2353     parameter. For jobs that don't have an end timestamp, the start
2354     timestamp will be considered. The special '-1' age will cause
2355     archival of all jobs (that are not running or queued).
2356
2357     @type age: int
2358     @param age: the minimum age in seconds
2359
2360     """
2361     logging.info("Archiving jobs with age more than %s seconds", age)
2362
2363     now = time.time()
2364     end_time = now + timeout
2365     archived_count = 0
2366     last_touched = 0
2367
2368     all_job_ids = self._GetJobIDsUnlocked()
2369     pending = []
2370     for idx, job_id in enumerate(all_job_ids):
2371       last_touched = idx + 1
2372
2373       # Not optimal because jobs could be pending
2374       # TODO: Measure average duration for job archival and take number of
2375       # pending jobs into account.
2376       if time.time() > end_time:
2377         break
2378
2379       # Returns None if the job failed to load
2380       job = self._LoadJobUnlocked(job_id)
2381       if job:
2382         if job.end_timestamp is None:
2383           if job.start_timestamp is None:
2384             job_age = job.received_timestamp
2385           else:
2386             job_age = job.start_timestamp
2387         else:
2388           job_age = job.end_timestamp
2389
2390         if age == -1 or now - job_age[0] > age:
2391           pending.append(job)
2392
2393           # Archive 10 jobs at a time
2394           if len(pending) >= 10:
2395             archived_count += self._ArchiveJobsUnlocked(pending)
2396             pending = []
2397
2398     if pending:
2399       archived_count += self._ArchiveJobsUnlocked(pending)
2400
2401     return (archived_count, len(all_job_ids) - last_touched)
2402
2403   def _Query(self, fields, qfilter):
2404     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2405                        namefield="id")
2406
2407     job_ids = qobj.RequestedNames()
2408
2409     list_all = (job_ids is None)
2410
2411     if list_all:
2412       # Since files are added to/removed from the queue atomically, there's no
2413       # risk of getting the job ids in an inconsistent state.
2414       job_ids = self._GetJobIDsUnlocked()
2415
2416     jobs = []
2417
2418     for job_id in job_ids:
2419       job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2420       if job is not None or not list_all:
2421         jobs.append((job_id, job))
2422
2423     return (qobj, jobs, list_all)
2424
2425   def QueryJobs(self, fields, qfilter):
2426     """Returns a list of jobs in queue.
2427
2428     @type fields: sequence
2429     @param fields: List of wanted fields
2430     @type qfilter: None or query2 filter (list)
2431     @param qfilter: Query filter
2432
2433     """
2434     (qobj, ctx, _) = self._Query(fields, qfilter)
2435
2436     return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2437
2438   def OldStyleQueryJobs(self, job_ids, fields):
2439     """Returns a list of jobs in queue.
2440
2441     @type job_ids: list
2442     @param job_ids: sequence of job identifiers or None for all
2443     @type fields: list
2444     @param fields: names of fields to return
2445     @rtype: list
2446     @return: list one element per job, each element being list with
2447         the requested fields
2448
2449     """
2450     # backwards compat:
2451     job_ids = [int(jid) for jid in job_ids]
2452     qfilter = qlang.MakeSimpleFilter("id", job_ids)
2453
2454     (qobj, ctx, _) = self._Query(fields, qfilter)
2455
2456     return qobj.OldStyleQuery(ctx, sort_by_name=False)
2457
2458   @locking.ssynchronized(_LOCK)
2459   def PrepareShutdown(self):
2460     """Prepare to stop the job queue.
2461
2462     Disables execution of jobs in the workerpool and returns whether there are
2463     any jobs currently running. If the latter is the case, the job queue is not
2464     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2465     be called without interfering with any job. Queued and unfinished jobs will
2466     be resumed next time.
2467
2468     Once this function has been called no new job submissions will be accepted
2469     (see L{_RequireNonDrainedQueue}).
2470
2471     @rtype: bool
2472     @return: Whether there are any running jobs
2473
2474     """
2475     if self._accepting_jobs:
2476       self._accepting_jobs = False
2477
2478       # Tell worker pool to stop processing pending tasks
2479       self._wpool.SetActive(False)
2480
2481     return self._wpool.HasRunningTasks()
2482
2483   @locking.ssynchronized(_LOCK)
2484   @_RequireOpenQueue
2485   def Shutdown(self):
2486     """Stops the job queue.
2487
2488     This shutdowns all the worker threads an closes the queue.
2489
2490     """
2491     self._wpool.TerminateWorkers()
2492
2493     self._queue_filelock.Close()
2494     self._queue_filelock = None