jqueue/mcpu: Determine priority using callback
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38
39 try:
40   # pylint: disable=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59 from ganeti import ht
60 from ganeti import query
61 from ganeti import qlang
62 from ganeti import pathutils
63 from ganeti import vcluster
64
65
66 JOBQUEUE_THREADS = 25
67
68 # member lock names to be passed to @ssynchronized decorator
69 _LOCK = "_lock"
70 _QUEUE = "_queue"
71
72
73 class CancelJob(Exception):
74   """Special exception to cancel a job.
75
76   """
77
78
79 class QueueShutdown(Exception):
80   """Special exception to abort a job when the job queue is shutting down.
81
82   """
83
84
85 def TimeStampNow():
86   """Returns the current timestamp.
87
88   @rtype: tuple
89   @return: the current time in the (seconds, microseconds) format
90
91   """
92   return utils.SplitTime(time.time())
93
94
95 def _CallJqUpdate(runner, names, file_name, content):
96   """Updates job queue file after virtualizing filename.
97
98   """
99   virt_file_name = vcluster.MakeVirtualPath(file_name)
100   return runner.call_jobqueue_update(names, virt_file_name, content)
101
102
103 class _SimpleJobQuery:
104   """Wrapper for job queries.
105
106   Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
107
108   """
109   def __init__(self, fields):
110     """Initializes this class.
111
112     """
113     self._query = query.Query(query.JOB_FIELDS, fields)
114
115   def __call__(self, job):
116     """Executes a job query using cached field list.
117
118     """
119     return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
120
121
122 class _QueuedOpCode(object):
123   """Encapsulates an opcode object.
124
125   @ivar log: holds the execution log and consists of tuples
126   of the form C{(log_serial, timestamp, level, message)}
127   @ivar input: the OpCode we encapsulate
128   @ivar status: the current status
129   @ivar result: the result of the LU execution
130   @ivar start_timestamp: timestamp for the start of the execution
131   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
132   @ivar stop_timestamp: timestamp for the end of the execution
133
134   """
135   __slots__ = ["input", "status", "result", "log", "priority",
136                "start_timestamp", "exec_timestamp", "end_timestamp",
137                "__weakref__"]
138
139   def __init__(self, op):
140     """Initializes instances of this class.
141
142     @type op: L{opcodes.OpCode}
143     @param op: the opcode we encapsulate
144
145     """
146     self.input = op
147     self.status = constants.OP_STATUS_QUEUED
148     self.result = None
149     self.log = []
150     self.start_timestamp = None
151     self.exec_timestamp = None
152     self.end_timestamp = None
153
154     # Get initial priority (it might change during the lifetime of this opcode)
155     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
156
157   @classmethod
158   def Restore(cls, state):
159     """Restore the _QueuedOpCode from the serialized form.
160
161     @type state: dict
162     @param state: the serialized state
163     @rtype: _QueuedOpCode
164     @return: a new _QueuedOpCode instance
165
166     """
167     obj = _QueuedOpCode.__new__(cls)
168     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
169     obj.status = state["status"]
170     obj.result = state["result"]
171     obj.log = state["log"]
172     obj.start_timestamp = state.get("start_timestamp", None)
173     obj.exec_timestamp = state.get("exec_timestamp", None)
174     obj.end_timestamp = state.get("end_timestamp", None)
175     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
176     return obj
177
178   def Serialize(self):
179     """Serializes this _QueuedOpCode.
180
181     @rtype: dict
182     @return: the dictionary holding the serialized state
183
184     """
185     return {
186       "input": self.input.__getstate__(),
187       "status": self.status,
188       "result": self.result,
189       "log": self.log,
190       "start_timestamp": self.start_timestamp,
191       "exec_timestamp": self.exec_timestamp,
192       "end_timestamp": self.end_timestamp,
193       "priority": self.priority,
194       }
195
196
197 class _QueuedJob(object):
198   """In-memory job representation.
199
200   This is what we use to track the user-submitted jobs. Locking must
201   be taken care of by users of this class.
202
203   @type queue: L{JobQueue}
204   @ivar queue: the parent queue
205   @ivar id: the job ID
206   @type ops: list
207   @ivar ops: the list of _QueuedOpCode that constitute the job
208   @type log_serial: int
209   @ivar log_serial: holds the index for the next log entry
210   @ivar received_timestamp: the timestamp for when the job was received
211   @ivar start_timestmap: the timestamp for start of execution
212   @ivar end_timestamp: the timestamp for end of execution
213   @ivar writable: Whether the job is allowed to be modified
214
215   """
216   # pylint: disable=W0212
217   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
218                "received_timestamp", "start_timestamp", "end_timestamp",
219                "__weakref__", "processor_lock", "writable", "archived"]
220
221   def __init__(self, queue, job_id, ops, writable):
222     """Constructor for the _QueuedJob.
223
224     @type queue: L{JobQueue}
225     @param queue: our parent queue
226     @type job_id: job_id
227     @param job_id: our job id
228     @type ops: list
229     @param ops: the list of opcodes we hold, which will be encapsulated
230         in _QueuedOpCodes
231     @type writable: bool
232     @param writable: Whether job can be modified
233
234     """
235     if not ops:
236       raise errors.GenericError("A job needs at least one opcode")
237
238     self.queue = queue
239     self.id = int(job_id)
240     self.ops = [_QueuedOpCode(op) for op in ops]
241     self.log_serial = 0
242     self.received_timestamp = TimeStampNow()
243     self.start_timestamp = None
244     self.end_timestamp = None
245     self.archived = False
246
247     self._InitInMemory(self, writable)
248
249     assert not self.archived, "New jobs can not be marked as archived"
250
251   @staticmethod
252   def _InitInMemory(obj, writable):
253     """Initializes in-memory variables.
254
255     """
256     obj.writable = writable
257     obj.ops_iter = None
258     obj.cur_opctx = None
259
260     # Read-only jobs are not processed and therefore don't need a lock
261     if writable:
262       obj.processor_lock = threading.Lock()
263     else:
264       obj.processor_lock = None
265
266   def __repr__(self):
267     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
268               "id=%s" % self.id,
269               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
270
271     return "<%s at %#x>" % (" ".join(status), id(self))
272
273   @classmethod
274   def Restore(cls, queue, state, writable, archived):
275     """Restore a _QueuedJob from serialized state:
276
277     @type queue: L{JobQueue}
278     @param queue: to which queue the restored job belongs
279     @type state: dict
280     @param state: the serialized state
281     @type writable: bool
282     @param writable: Whether job can be modified
283     @type archived: bool
284     @param archived: Whether job was already archived
285     @rtype: _JobQueue
286     @return: the restored _JobQueue instance
287
288     """
289     obj = _QueuedJob.__new__(cls)
290     obj.queue = queue
291     obj.id = int(state["id"])
292     obj.received_timestamp = state.get("received_timestamp", None)
293     obj.start_timestamp = state.get("start_timestamp", None)
294     obj.end_timestamp = state.get("end_timestamp", None)
295     obj.archived = archived
296
297     obj.ops = []
298     obj.log_serial = 0
299     for op_state in state["ops"]:
300       op = _QueuedOpCode.Restore(op_state)
301       for log_entry in op.log:
302         obj.log_serial = max(obj.log_serial, log_entry[0])
303       obj.ops.append(op)
304
305     cls._InitInMemory(obj, writable)
306
307     return obj
308
309   def Serialize(self):
310     """Serialize the _JobQueue instance.
311
312     @rtype: dict
313     @return: the serialized state
314
315     """
316     return {
317       "id": self.id,
318       "ops": [op.Serialize() for op in self.ops],
319       "start_timestamp": self.start_timestamp,
320       "end_timestamp": self.end_timestamp,
321       "received_timestamp": self.received_timestamp,
322       }
323
324   def CalcStatus(self):
325     """Compute the status of this job.
326
327     This function iterates over all the _QueuedOpCodes in the job and
328     based on their status, computes the job status.
329
330     The algorithm is:
331       - if we find a cancelled, or finished with error, the job
332         status will be the same
333       - otherwise, the last opcode with the status one of:
334           - waitlock
335           - canceling
336           - running
337
338         will determine the job status
339
340       - otherwise, it means either all opcodes are queued, or success,
341         and the job status will be the same
342
343     @return: the job status
344
345     """
346     status = constants.JOB_STATUS_QUEUED
347
348     all_success = True
349     for op in self.ops:
350       if op.status == constants.OP_STATUS_SUCCESS:
351         continue
352
353       all_success = False
354
355       if op.status == constants.OP_STATUS_QUEUED:
356         pass
357       elif op.status == constants.OP_STATUS_WAITING:
358         status = constants.JOB_STATUS_WAITING
359       elif op.status == constants.OP_STATUS_RUNNING:
360         status = constants.JOB_STATUS_RUNNING
361       elif op.status == constants.OP_STATUS_CANCELING:
362         status = constants.JOB_STATUS_CANCELING
363         break
364       elif op.status == constants.OP_STATUS_ERROR:
365         status = constants.JOB_STATUS_ERROR
366         # The whole job fails if one opcode failed
367         break
368       elif op.status == constants.OP_STATUS_CANCELED:
369         status = constants.OP_STATUS_CANCELED
370         break
371
372     if all_success:
373       status = constants.JOB_STATUS_SUCCESS
374
375     return status
376
377   def CalcPriority(self):
378     """Gets the current priority for this job.
379
380     Only unfinished opcodes are considered. When all are done, the default
381     priority is used.
382
383     @rtype: int
384
385     """
386     priorities = [op.priority for op in self.ops
387                   if op.status not in constants.OPS_FINALIZED]
388
389     if not priorities:
390       # All opcodes are done, assume default priority
391       return constants.OP_PRIO_DEFAULT
392
393     return min(priorities)
394
395   def GetLogEntries(self, newer_than):
396     """Selectively returns the log entries.
397
398     @type newer_than: None or int
399     @param newer_than: if this is None, return all log entries,
400         otherwise return only the log entries with serial higher
401         than this value
402     @rtype: list
403     @return: the list of the log entries selected
404
405     """
406     if newer_than is None:
407       serial = -1
408     else:
409       serial = newer_than
410
411     entries = []
412     for op in self.ops:
413       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
414
415     return entries
416
417   def GetInfo(self, fields):
418     """Returns information about a job.
419
420     @type fields: list
421     @param fields: names of fields to return
422     @rtype: list
423     @return: list with one element for each field
424     @raise errors.OpExecError: when an invalid field
425         has been passed
426
427     """
428     return _SimpleJobQuery(fields)(self)
429
430   def MarkUnfinishedOps(self, status, result):
431     """Mark unfinished opcodes with a given status and result.
432
433     This is an utility function for marking all running or waiting to
434     be run opcodes with a given status. Opcodes which are already
435     finalised are not changed.
436
437     @param status: a given opcode status
438     @param result: the opcode result
439
440     """
441     not_marked = True
442     for op in self.ops:
443       if op.status in constants.OPS_FINALIZED:
444         assert not_marked, "Finalized opcodes found after non-finalized ones"
445         continue
446       op.status = status
447       op.result = result
448       not_marked = False
449
450   def Finalize(self):
451     """Marks the job as finalized.
452
453     """
454     self.end_timestamp = TimeStampNow()
455
456   def Cancel(self):
457     """Marks job as canceled/-ing if possible.
458
459     @rtype: tuple; (bool, string)
460     @return: Boolean describing whether job was successfully canceled or marked
461       as canceling and a text message
462
463     """
464     status = self.CalcStatus()
465
466     if status == constants.JOB_STATUS_QUEUED:
467       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
468                              "Job canceled by request")
469       self.Finalize()
470       return (True, "Job %s canceled" % self.id)
471
472     elif status == constants.JOB_STATUS_WAITING:
473       # The worker will notice the new status and cancel the job
474       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
475       return (True, "Job %s will be canceled" % self.id)
476
477     else:
478       logging.debug("Job %s is no longer waiting in the queue", self.id)
479       return (False, "Job %s is no longer waiting in the queue" % self.id)
480
481
482 class _OpExecCallbacks(mcpu.OpExecCbBase):
483   def __init__(self, queue, job, op):
484     """Initializes this class.
485
486     @type queue: L{JobQueue}
487     @param queue: Job queue
488     @type job: L{_QueuedJob}
489     @param job: Job object
490     @type op: L{_QueuedOpCode}
491     @param op: OpCode
492
493     """
494     assert queue, "Queue is missing"
495     assert job, "Job is missing"
496     assert op, "Opcode is missing"
497
498     self._queue = queue
499     self._job = job
500     self._op = op
501
502   def _CheckCancel(self):
503     """Raises an exception to cancel the job if asked to.
504
505     """
506     # Cancel here if we were asked to
507     if self._op.status == constants.OP_STATUS_CANCELING:
508       logging.debug("Canceling opcode")
509       raise CancelJob()
510
511     # See if queue is shutting down
512     if not self._queue.AcceptingJobsUnlocked():
513       logging.debug("Queue is shutting down")
514       raise QueueShutdown()
515
516   @locking.ssynchronized(_QUEUE, shared=1)
517   def NotifyStart(self):
518     """Mark the opcode as running, not lock-waiting.
519
520     This is called from the mcpu code as a notifier function, when the LU is
521     finally about to start the Exec() method. Of course, to have end-user
522     visible results, the opcode must be initially (before calling into
523     Processor.ExecOpCode) set to OP_STATUS_WAITING.
524
525     """
526     assert self._op in self._job.ops
527     assert self._op.status in (constants.OP_STATUS_WAITING,
528                                constants.OP_STATUS_CANCELING)
529
530     # Cancel here if we were asked to
531     self._CheckCancel()
532
533     logging.debug("Opcode is now running")
534
535     self._op.status = constants.OP_STATUS_RUNNING
536     self._op.exec_timestamp = TimeStampNow()
537
538     # And finally replicate the job status
539     self._queue.UpdateJobUnlocked(self._job)
540
541   @locking.ssynchronized(_QUEUE, shared=1)
542   def _AppendFeedback(self, timestamp, log_type, log_msg):
543     """Internal feedback append function, with locks
544
545     """
546     self._job.log_serial += 1
547     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
548     self._queue.UpdateJobUnlocked(self._job, replicate=False)
549
550   def Feedback(self, *args):
551     """Append a log entry.
552
553     """
554     assert len(args) < 3
555
556     if len(args) == 1:
557       log_type = constants.ELOG_MESSAGE
558       log_msg = args[0]
559     else:
560       (log_type, log_msg) = args
561
562     # The time is split to make serialization easier and not lose
563     # precision.
564     timestamp = utils.SplitTime(time.time())
565     self._AppendFeedback(timestamp, log_type, log_msg)
566
567   def CurrentPriority(self):
568     """Returns current priority for opcode.
569
570     """
571     assert self._op.status in (constants.OP_STATUS_WAITING,
572                                constants.OP_STATUS_CANCELING)
573
574     # Cancel here if we were asked to
575     self._CheckCancel()
576
577     return self._op.priority
578
579   def SubmitManyJobs(self, jobs):
580     """Submits jobs for processing.
581
582     See L{JobQueue.SubmitManyJobs}.
583
584     """
585     # Locking is done in job queue
586     return self._queue.SubmitManyJobs(jobs)
587
588
589 class _JobChangesChecker(object):
590   def __init__(self, fields, prev_job_info, prev_log_serial):
591     """Initializes this class.
592
593     @type fields: list of strings
594     @param fields: Fields requested by LUXI client
595     @type prev_job_info: string
596     @param prev_job_info: previous job info, as passed by the LUXI client
597     @type prev_log_serial: string
598     @param prev_log_serial: previous job serial, as passed by the LUXI client
599
600     """
601     self._squery = _SimpleJobQuery(fields)
602     self._prev_job_info = prev_job_info
603     self._prev_log_serial = prev_log_serial
604
605   def __call__(self, job):
606     """Checks whether job has changed.
607
608     @type job: L{_QueuedJob}
609     @param job: Job object
610
611     """
612     assert not job.writable, "Expected read-only job"
613
614     status = job.CalcStatus()
615     job_info = self._squery(job)
616     log_entries = job.GetLogEntries(self._prev_log_serial)
617
618     # Serializing and deserializing data can cause type changes (e.g. from
619     # tuple to list) or precision loss. We're doing it here so that we get
620     # the same modifications as the data received from the client. Without
621     # this, the comparison afterwards might fail without the data being
622     # significantly different.
623     # TODO: we just deserialized from disk, investigate how to make sure that
624     # the job info and log entries are compatible to avoid this further step.
625     # TODO: Doing something like in testutils.py:UnifyValueType might be more
626     # efficient, though floats will be tricky
627     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
628     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
629
630     # Don't even try to wait if the job is no longer running, there will be
631     # no changes.
632     if (status not in (constants.JOB_STATUS_QUEUED,
633                        constants.JOB_STATUS_RUNNING,
634                        constants.JOB_STATUS_WAITING) or
635         job_info != self._prev_job_info or
636         (log_entries and self._prev_log_serial != log_entries[0][0])):
637       logging.debug("Job %s changed", job.id)
638       return (job_info, log_entries)
639
640     return None
641
642
643 class _JobFileChangesWaiter(object):
644   def __init__(self, filename):
645     """Initializes this class.
646
647     @type filename: string
648     @param filename: Path to job file
649     @raises errors.InotifyError: if the notifier cannot be setup
650
651     """
652     self._wm = pyinotify.WatchManager()
653     self._inotify_handler = \
654       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
655     self._notifier = \
656       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
657     try:
658       self._inotify_handler.enable()
659     except Exception:
660       # pyinotify doesn't close file descriptors automatically
661       self._notifier.stop()
662       raise
663
664   def _OnInotify(self, notifier_enabled):
665     """Callback for inotify.
666
667     """
668     if not notifier_enabled:
669       self._inotify_handler.enable()
670
671   def Wait(self, timeout):
672     """Waits for the job file to change.
673
674     @type timeout: float
675     @param timeout: Timeout in seconds
676     @return: Whether there have been events
677
678     """
679     assert timeout >= 0
680     have_events = self._notifier.check_events(timeout * 1000)
681     if have_events:
682       self._notifier.read_events()
683     self._notifier.process_events()
684     return have_events
685
686   def Close(self):
687     """Closes underlying notifier and its file descriptor.
688
689     """
690     self._notifier.stop()
691
692
693 class _JobChangesWaiter(object):
694   def __init__(self, filename):
695     """Initializes this class.
696
697     @type filename: string
698     @param filename: Path to job file
699
700     """
701     self._filewaiter = None
702     self._filename = filename
703
704   def Wait(self, timeout):
705     """Waits for a job to change.
706
707     @type timeout: float
708     @param timeout: Timeout in seconds
709     @return: Whether there have been events
710
711     """
712     if self._filewaiter:
713       return self._filewaiter.Wait(timeout)
714
715     # Lazy setup: Avoid inotify setup cost when job file has already changed.
716     # If this point is reached, return immediately and let caller check the job
717     # file again in case there were changes since the last check. This avoids a
718     # race condition.
719     self._filewaiter = _JobFileChangesWaiter(self._filename)
720
721     return True
722
723   def Close(self):
724     """Closes underlying waiter.
725
726     """
727     if self._filewaiter:
728       self._filewaiter.Close()
729
730
731 class _WaitForJobChangesHelper(object):
732   """Helper class using inotify to wait for changes in a job file.
733
734   This class takes a previous job status and serial, and alerts the client when
735   the current job status has changed.
736
737   """
738   @staticmethod
739   def _CheckForChanges(counter, job_load_fn, check_fn):
740     if counter.next() > 0:
741       # If this isn't the first check the job is given some more time to change
742       # again. This gives better performance for jobs generating many
743       # changes/messages.
744       time.sleep(0.1)
745
746     job = job_load_fn()
747     if not job:
748       raise errors.JobLost()
749
750     result = check_fn(job)
751     if result is None:
752       raise utils.RetryAgain()
753
754     return result
755
756   def __call__(self, filename, job_load_fn,
757                fields, prev_job_info, prev_log_serial, timeout):
758     """Waits for changes on a job.
759
760     @type filename: string
761     @param filename: File on which to wait for changes
762     @type job_load_fn: callable
763     @param job_load_fn: Function to load job
764     @type fields: list of strings
765     @param fields: Which fields to check for changes
766     @type prev_job_info: list or None
767     @param prev_job_info: Last job information returned
768     @type prev_log_serial: int
769     @param prev_log_serial: Last job message serial number
770     @type timeout: float
771     @param timeout: maximum time to wait in seconds
772
773     """
774     counter = itertools.count()
775     try:
776       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
777       waiter = _JobChangesWaiter(filename)
778       try:
779         return utils.Retry(compat.partial(self._CheckForChanges,
780                                           counter, job_load_fn, check_fn),
781                            utils.RETRY_REMAINING_TIME, timeout,
782                            wait_fn=waiter.Wait)
783       finally:
784         waiter.Close()
785     except (errors.InotifyError, errors.JobLost):
786       return None
787     except utils.RetryTimeout:
788       return constants.JOB_NOTCHANGED
789
790
791 def _EncodeOpError(err):
792   """Encodes an error which occurred while processing an opcode.
793
794   """
795   if isinstance(err, errors.GenericError):
796     to_encode = err
797   else:
798     to_encode = errors.OpExecError(str(err))
799
800   return errors.EncodeException(to_encode)
801
802
803 class _TimeoutStrategyWrapper:
804   def __init__(self, fn):
805     """Initializes this class.
806
807     """
808     self._fn = fn
809     self._next = None
810
811   def _Advance(self):
812     """Gets the next timeout if necessary.
813
814     """
815     if self._next is None:
816       self._next = self._fn()
817
818   def Peek(self):
819     """Returns the next timeout.
820
821     """
822     self._Advance()
823     return self._next
824
825   def Next(self):
826     """Returns the current timeout and advances the internal state.
827
828     """
829     self._Advance()
830     result = self._next
831     self._next = None
832     return result
833
834
835 class _OpExecContext:
836   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
837     """Initializes this class.
838
839     """
840     self.op = op
841     self.index = index
842     self.log_prefix = log_prefix
843     self.summary = op.input.Summary()
844
845     # Create local copy to modify
846     if getattr(op.input, opcodes.DEPEND_ATTR, None):
847       self.jobdeps = op.input.depends[:]
848     else:
849       self.jobdeps = None
850
851     self._timeout_strategy_factory = timeout_strategy_factory
852     self._ResetTimeoutStrategy()
853
854   def _ResetTimeoutStrategy(self):
855     """Creates a new timeout strategy.
856
857     """
858     self._timeout_strategy = \
859       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
860
861   def CheckPriorityIncrease(self):
862     """Checks whether priority can and should be increased.
863
864     Called when locks couldn't be acquired.
865
866     """
867     op = self.op
868
869     # Exhausted all retries and next round should not use blocking acquire
870     # for locks?
871     if (self._timeout_strategy.Peek() is None and
872         op.priority > constants.OP_PRIO_HIGHEST):
873       logging.debug("Increasing priority")
874       op.priority -= 1
875       self._ResetTimeoutStrategy()
876       return True
877
878     return False
879
880   def GetNextLockTimeout(self):
881     """Returns the next lock acquire timeout.
882
883     """
884     return self._timeout_strategy.Next()
885
886
887 class _JobProcessor(object):
888   (DEFER,
889    WAITDEP,
890    FINISHED) = range(1, 4)
891
892   def __init__(self, queue, opexec_fn, job,
893                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
894     """Initializes this class.
895
896     """
897     self.queue = queue
898     self.opexec_fn = opexec_fn
899     self.job = job
900     self._timeout_strategy_factory = _timeout_strategy_factory
901
902   @staticmethod
903   def _FindNextOpcode(job, timeout_strategy_factory):
904     """Locates the next opcode to run.
905
906     @type job: L{_QueuedJob}
907     @param job: Job object
908     @param timeout_strategy_factory: Callable to create new timeout strategy
909
910     """
911     # Create some sort of a cache to speed up locating next opcode for future
912     # lookups
913     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
914     # pending and one for processed ops.
915     if job.ops_iter is None:
916       job.ops_iter = enumerate(job.ops)
917
918     # Find next opcode to run
919     while True:
920       try:
921         (idx, op) = job.ops_iter.next()
922       except StopIteration:
923         raise errors.ProgrammerError("Called for a finished job")
924
925       if op.status == constants.OP_STATUS_RUNNING:
926         # Found an opcode already marked as running
927         raise errors.ProgrammerError("Called for job marked as running")
928
929       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
930                              timeout_strategy_factory)
931
932       if op.status not in constants.OPS_FINALIZED:
933         return opctx
934
935       # This is a job that was partially completed before master daemon
936       # shutdown, so it can be expected that some opcodes are already
937       # completed successfully (if any did error out, then the whole job
938       # should have been aborted and not resubmitted for processing).
939       logging.info("%s: opcode %s already processed, skipping",
940                    opctx.log_prefix, opctx.summary)
941
942   @staticmethod
943   def _MarkWaitlock(job, op):
944     """Marks an opcode as waiting for locks.
945
946     The job's start timestamp is also set if necessary.
947
948     @type job: L{_QueuedJob}
949     @param job: Job object
950     @type op: L{_QueuedOpCode}
951     @param op: Opcode object
952
953     """
954     assert op in job.ops
955     assert op.status in (constants.OP_STATUS_QUEUED,
956                          constants.OP_STATUS_WAITING)
957
958     update = False
959
960     op.result = None
961
962     if op.status == constants.OP_STATUS_QUEUED:
963       op.status = constants.OP_STATUS_WAITING
964       update = True
965
966     if op.start_timestamp is None:
967       op.start_timestamp = TimeStampNow()
968       update = True
969
970     if job.start_timestamp is None:
971       job.start_timestamp = op.start_timestamp
972       update = True
973
974     assert op.status == constants.OP_STATUS_WAITING
975
976     return update
977
978   @staticmethod
979   def _CheckDependencies(queue, job, opctx):
980     """Checks if an opcode has dependencies and if so, processes them.
981
982     @type queue: L{JobQueue}
983     @param queue: Queue object
984     @type job: L{_QueuedJob}
985     @param job: Job object
986     @type opctx: L{_OpExecContext}
987     @param opctx: Opcode execution context
988     @rtype: bool
989     @return: Whether opcode will be re-scheduled by dependency tracker
990
991     """
992     op = opctx.op
993
994     result = False
995
996     while opctx.jobdeps:
997       (dep_job_id, dep_status) = opctx.jobdeps[0]
998
999       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1000                                                           dep_status)
1001       assert ht.TNonEmptyString(depmsg), "No dependency message"
1002
1003       logging.info("%s: %s", opctx.log_prefix, depmsg)
1004
1005       if depresult == _JobDependencyManager.CONTINUE:
1006         # Remove dependency and continue
1007         opctx.jobdeps.pop(0)
1008
1009       elif depresult == _JobDependencyManager.WAIT:
1010         # Need to wait for notification, dependency tracker will re-add job
1011         # to workerpool
1012         result = True
1013         break
1014
1015       elif depresult == _JobDependencyManager.CANCEL:
1016         # Job was cancelled, cancel this job as well
1017         job.Cancel()
1018         assert op.status == constants.OP_STATUS_CANCELING
1019         break
1020
1021       elif depresult in (_JobDependencyManager.WRONGSTATUS,
1022                          _JobDependencyManager.ERROR):
1023         # Job failed or there was an error, this job must fail
1024         op.status = constants.OP_STATUS_ERROR
1025         op.result = _EncodeOpError(errors.OpExecError(depmsg))
1026         break
1027
1028       else:
1029         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1030                                      depresult)
1031
1032     return result
1033
1034   def _ExecOpCodeUnlocked(self, opctx):
1035     """Processes one opcode and returns the result.
1036
1037     """
1038     op = opctx.op
1039
1040     assert op.status == constants.OP_STATUS_WAITING
1041
1042     timeout = opctx.GetNextLockTimeout()
1043
1044     try:
1045       # Make sure not to hold queue lock while calling ExecOpCode
1046       result = self.opexec_fn(op.input,
1047                               _OpExecCallbacks(self.queue, self.job, op),
1048                               timeout=timeout)
1049     except mcpu.LockAcquireTimeout:
1050       assert timeout is not None, "Received timeout for blocking acquire"
1051       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1052
1053       assert op.status in (constants.OP_STATUS_WAITING,
1054                            constants.OP_STATUS_CANCELING)
1055
1056       # Was job cancelled while we were waiting for the lock?
1057       if op.status == constants.OP_STATUS_CANCELING:
1058         return (constants.OP_STATUS_CANCELING, None)
1059
1060       # Queue is shutting down, return to queued
1061       if not self.queue.AcceptingJobsUnlocked():
1062         return (constants.OP_STATUS_QUEUED, None)
1063
1064       # Stay in waitlock while trying to re-acquire lock
1065       return (constants.OP_STATUS_WAITING, None)
1066     except CancelJob:
1067       logging.exception("%s: Canceling job", opctx.log_prefix)
1068       assert op.status == constants.OP_STATUS_CANCELING
1069       return (constants.OP_STATUS_CANCELING, None)
1070
1071     except QueueShutdown:
1072       logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1073
1074       assert op.status == constants.OP_STATUS_WAITING
1075
1076       # Job hadn't been started yet, so it should return to the queue
1077       return (constants.OP_STATUS_QUEUED, None)
1078
1079     except Exception, err: # pylint: disable=W0703
1080       logging.exception("%s: Caught exception in %s",
1081                         opctx.log_prefix, opctx.summary)
1082       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1083     else:
1084       logging.debug("%s: %s successful",
1085                     opctx.log_prefix, opctx.summary)
1086       return (constants.OP_STATUS_SUCCESS, result)
1087
1088   def __call__(self, _nextop_fn=None):
1089     """Continues execution of a job.
1090
1091     @param _nextop_fn: Callback function for tests
1092     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1093       be deferred and C{WAITDEP} if the dependency manager
1094       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1095
1096     """
1097     queue = self.queue
1098     job = self.job
1099
1100     logging.debug("Processing job %s", job.id)
1101
1102     queue.acquire(shared=1)
1103     try:
1104       opcount = len(job.ops)
1105
1106       assert job.writable, "Expected writable job"
1107
1108       # Don't do anything for finalized jobs
1109       if job.CalcStatus() in constants.JOBS_FINALIZED:
1110         return self.FINISHED
1111
1112       # Is a previous opcode still pending?
1113       if job.cur_opctx:
1114         opctx = job.cur_opctx
1115         job.cur_opctx = None
1116       else:
1117         if __debug__ and _nextop_fn:
1118           _nextop_fn()
1119         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1120
1121       op = opctx.op
1122
1123       # Consistency check
1124       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1125                                      constants.OP_STATUS_CANCELING)
1126                         for i in job.ops[opctx.index + 1:])
1127
1128       assert op.status in (constants.OP_STATUS_QUEUED,
1129                            constants.OP_STATUS_WAITING,
1130                            constants.OP_STATUS_CANCELING)
1131
1132       assert (op.priority <= constants.OP_PRIO_LOWEST and
1133               op.priority >= constants.OP_PRIO_HIGHEST)
1134
1135       waitjob = None
1136
1137       if op.status != constants.OP_STATUS_CANCELING:
1138         assert op.status in (constants.OP_STATUS_QUEUED,
1139                              constants.OP_STATUS_WAITING)
1140
1141         # Prepare to start opcode
1142         if self._MarkWaitlock(job, op):
1143           # Write to disk
1144           queue.UpdateJobUnlocked(job)
1145
1146         assert op.status == constants.OP_STATUS_WAITING
1147         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1148         assert job.start_timestamp and op.start_timestamp
1149         assert waitjob is None
1150
1151         # Check if waiting for a job is necessary
1152         waitjob = self._CheckDependencies(queue, job, opctx)
1153
1154         assert op.status in (constants.OP_STATUS_WAITING,
1155                              constants.OP_STATUS_CANCELING,
1156                              constants.OP_STATUS_ERROR)
1157
1158         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1159                                          constants.OP_STATUS_ERROR)):
1160           logging.info("%s: opcode %s waiting for locks",
1161                        opctx.log_prefix, opctx.summary)
1162
1163           assert not opctx.jobdeps, "Not all dependencies were removed"
1164
1165           queue.release()
1166           try:
1167             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1168           finally:
1169             queue.acquire(shared=1)
1170
1171           op.status = op_status
1172           op.result = op_result
1173
1174           assert not waitjob
1175
1176         if op.status in (constants.OP_STATUS_WAITING,
1177                          constants.OP_STATUS_QUEUED):
1178           # waiting: Couldn't get locks in time
1179           # queued: Queue is shutting down
1180           assert not op.end_timestamp
1181         else:
1182           # Finalize opcode
1183           op.end_timestamp = TimeStampNow()
1184
1185           if op.status == constants.OP_STATUS_CANCELING:
1186             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1187                                   for i in job.ops[opctx.index:])
1188           else:
1189             assert op.status in constants.OPS_FINALIZED
1190
1191       if op.status == constants.OP_STATUS_QUEUED:
1192         # Queue is shutting down
1193         assert not waitjob
1194
1195         finalize = False
1196
1197         # Reset context
1198         job.cur_opctx = None
1199
1200         # In no case must the status be finalized here
1201         assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1202
1203       elif op.status == constants.OP_STATUS_WAITING or waitjob:
1204         finalize = False
1205
1206         if not waitjob and opctx.CheckPriorityIncrease():
1207           # Priority was changed, need to update on-disk file
1208           queue.UpdateJobUnlocked(job)
1209
1210         # Keep around for another round
1211         job.cur_opctx = opctx
1212
1213         assert (op.priority <= constants.OP_PRIO_LOWEST and
1214                 op.priority >= constants.OP_PRIO_HIGHEST)
1215
1216         # In no case must the status be finalized here
1217         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1218
1219       else:
1220         # Ensure all opcodes so far have been successful
1221         assert (opctx.index == 0 or
1222                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1223                            for i in job.ops[:opctx.index]))
1224
1225         # Reset context
1226         job.cur_opctx = None
1227
1228         if op.status == constants.OP_STATUS_SUCCESS:
1229           finalize = False
1230
1231         elif op.status == constants.OP_STATUS_ERROR:
1232           # Ensure failed opcode has an exception as its result
1233           assert errors.GetEncodedError(job.ops[opctx.index].result)
1234
1235           to_encode = errors.OpExecError("Preceding opcode failed")
1236           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1237                                 _EncodeOpError(to_encode))
1238           finalize = True
1239
1240           # Consistency check
1241           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1242                             errors.GetEncodedError(i.result)
1243                             for i in job.ops[opctx.index:])
1244
1245         elif op.status == constants.OP_STATUS_CANCELING:
1246           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1247                                 "Job canceled by request")
1248           finalize = True
1249
1250         else:
1251           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1252
1253         if opctx.index == (opcount - 1):
1254           # Finalize on last opcode
1255           finalize = True
1256
1257         if finalize:
1258           # All opcodes have been run, finalize job
1259           job.Finalize()
1260
1261         # Write to disk. If the job status is final, this is the final write
1262         # allowed. Once the file has been written, it can be archived anytime.
1263         queue.UpdateJobUnlocked(job)
1264
1265         assert not waitjob
1266
1267         if finalize:
1268           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1269           return self.FINISHED
1270
1271       assert not waitjob or queue.depmgr.JobWaiting(job)
1272
1273       if waitjob:
1274         return self.WAITDEP
1275       else:
1276         return self.DEFER
1277     finally:
1278       assert job.writable, "Job became read-only while being processed"
1279       queue.release()
1280
1281
1282 def _EvaluateJobProcessorResult(depmgr, job, result):
1283   """Looks at a result from L{_JobProcessor} for a job.
1284
1285   To be used in a L{_JobQueueWorker}.
1286
1287   """
1288   if result == _JobProcessor.FINISHED:
1289     # Notify waiting jobs
1290     depmgr.NotifyWaiters(job.id)
1291
1292   elif result == _JobProcessor.DEFER:
1293     # Schedule again
1294     raise workerpool.DeferTask(priority=job.CalcPriority())
1295
1296   elif result == _JobProcessor.WAITDEP:
1297     # No-op, dependency manager will re-schedule
1298     pass
1299
1300   else:
1301     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1302                                  (result, ))
1303
1304
1305 class _JobQueueWorker(workerpool.BaseWorker):
1306   """The actual job workers.
1307
1308   """
1309   def RunTask(self, job): # pylint: disable=W0221
1310     """Job executor.
1311
1312     @type job: L{_QueuedJob}
1313     @param job: the job to be processed
1314
1315     """
1316     assert job.writable, "Expected writable job"
1317
1318     # Ensure only one worker is active on a single job. If a job registers for
1319     # a dependency job, and the other job notifies before the first worker is
1320     # done, the job can end up in the tasklist more than once.
1321     job.processor_lock.acquire()
1322     try:
1323       return self._RunTaskInner(job)
1324     finally:
1325       job.processor_lock.release()
1326
1327   def _RunTaskInner(self, job):
1328     """Executes a job.
1329
1330     Must be called with per-job lock acquired.
1331
1332     """
1333     queue = job.queue
1334     assert queue == self.pool.queue
1335
1336     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1337     setname_fn(None)
1338
1339     proc = mcpu.Processor(queue.context, job.id)
1340
1341     # Create wrapper for setting thread name
1342     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1343                                     proc.ExecOpCode)
1344
1345     _EvaluateJobProcessorResult(queue.depmgr, job,
1346                                 _JobProcessor(queue, wrap_execop_fn, job)())
1347
1348   @staticmethod
1349   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1350     """Updates the worker thread name to include a short summary of the opcode.
1351
1352     @param setname_fn: Callable setting worker thread name
1353     @param execop_fn: Callable for executing opcode (usually
1354                       L{mcpu.Processor.ExecOpCode})
1355
1356     """
1357     setname_fn(op)
1358     try:
1359       return execop_fn(op, *args, **kwargs)
1360     finally:
1361       setname_fn(None)
1362
1363   @staticmethod
1364   def _GetWorkerName(job, op):
1365     """Sets the worker thread name.
1366
1367     @type job: L{_QueuedJob}
1368     @type op: L{opcodes.OpCode}
1369
1370     """
1371     parts = ["Job%s" % job.id]
1372
1373     if op:
1374       parts.append(op.TinySummary())
1375
1376     return "/".join(parts)
1377
1378
1379 class _JobQueueWorkerPool(workerpool.WorkerPool):
1380   """Simple class implementing a job-processing workerpool.
1381
1382   """
1383   def __init__(self, queue):
1384     super(_JobQueueWorkerPool, self).__init__("Jq",
1385                                               JOBQUEUE_THREADS,
1386                                               _JobQueueWorker)
1387     self.queue = queue
1388
1389
1390 class _JobDependencyManager:
1391   """Keeps track of job dependencies.
1392
1393   """
1394   (WAIT,
1395    ERROR,
1396    CANCEL,
1397    CONTINUE,
1398    WRONGSTATUS) = range(1, 6)
1399
1400   def __init__(self, getstatus_fn, enqueue_fn):
1401     """Initializes this class.
1402
1403     """
1404     self._getstatus_fn = getstatus_fn
1405     self._enqueue_fn = enqueue_fn
1406
1407     self._waiters = {}
1408     self._lock = locking.SharedLock("JobDepMgr")
1409
1410   @locking.ssynchronized(_LOCK, shared=1)
1411   def GetLockInfo(self, requested): # pylint: disable=W0613
1412     """Retrieves information about waiting jobs.
1413
1414     @type requested: set
1415     @param requested: Requested information, see C{query.LQ_*}
1416
1417     """
1418     # No need to sort here, that's being done by the lock manager and query
1419     # library. There are no priorities for notifying jobs, hence all show up as
1420     # one item under "pending".
1421     return [("job/%s" % job_id, None, None,
1422              [("job", [job.id for job in waiters])])
1423             for job_id, waiters in self._waiters.items()
1424             if waiters]
1425
1426   @locking.ssynchronized(_LOCK, shared=1)
1427   def JobWaiting(self, job):
1428     """Checks if a job is waiting.
1429
1430     """
1431     return compat.any(job in jobs
1432                       for jobs in self._waiters.values())
1433
1434   @locking.ssynchronized(_LOCK)
1435   def CheckAndRegister(self, job, dep_job_id, dep_status):
1436     """Checks if a dependency job has the requested status.
1437
1438     If the other job is not yet in a finalized status, the calling job will be
1439     notified (re-added to the workerpool) at a later point.
1440
1441     @type job: L{_QueuedJob}
1442     @param job: Job object
1443     @type dep_job_id: int
1444     @param dep_job_id: ID of dependency job
1445     @type dep_status: list
1446     @param dep_status: Required status
1447
1448     """
1449     assert ht.TJobId(job.id)
1450     assert ht.TJobId(dep_job_id)
1451     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1452
1453     if job.id == dep_job_id:
1454       return (self.ERROR, "Job can't depend on itself")
1455
1456     # Get status of dependency job
1457     try:
1458       status = self._getstatus_fn(dep_job_id)
1459     except errors.JobLost, err:
1460       return (self.ERROR, "Dependency error: %s" % err)
1461
1462     assert status in constants.JOB_STATUS_ALL
1463
1464     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1465
1466     if status not in constants.JOBS_FINALIZED:
1467       # Register for notification and wait for job to finish
1468       job_id_waiters.add(job)
1469       return (self.WAIT,
1470               "Need to wait for job %s, wanted status '%s'" %
1471               (dep_job_id, dep_status))
1472
1473     # Remove from waiters list
1474     if job in job_id_waiters:
1475       job_id_waiters.remove(job)
1476
1477     if (status == constants.JOB_STATUS_CANCELED and
1478         constants.JOB_STATUS_CANCELED not in dep_status):
1479       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1480
1481     elif not dep_status or status in dep_status:
1482       return (self.CONTINUE,
1483               "Dependency job %s finished with status '%s'" %
1484               (dep_job_id, status))
1485
1486     else:
1487       return (self.WRONGSTATUS,
1488               "Dependency job %s finished with status '%s',"
1489               " not one of '%s' as required" %
1490               (dep_job_id, status, utils.CommaJoin(dep_status)))
1491
1492   def _RemoveEmptyWaitersUnlocked(self):
1493     """Remove all jobs without actual waiters.
1494
1495     """
1496     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1497                    if not waiters]:
1498       del self._waiters[job_id]
1499
1500   def NotifyWaiters(self, job_id):
1501     """Notifies all jobs waiting for a certain job ID.
1502
1503     @attention: Do not call until L{CheckAndRegister} returned a status other
1504       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1505     @type job_id: int
1506     @param job_id: Job ID
1507
1508     """
1509     assert ht.TJobId(job_id)
1510
1511     self._lock.acquire()
1512     try:
1513       self._RemoveEmptyWaitersUnlocked()
1514
1515       jobs = self._waiters.pop(job_id, None)
1516     finally:
1517       self._lock.release()
1518
1519     if jobs:
1520       # Re-add jobs to workerpool
1521       logging.debug("Re-adding %s jobs which were waiting for job %s",
1522                     len(jobs), job_id)
1523       self._enqueue_fn(jobs)
1524
1525
1526 def _RequireOpenQueue(fn):
1527   """Decorator for "public" functions.
1528
1529   This function should be used for all 'public' functions. That is,
1530   functions usually called from other classes. Note that this should
1531   be applied only to methods (not plain functions), since it expects
1532   that the decorated function is called with a first argument that has
1533   a '_queue_filelock' argument.
1534
1535   @warning: Use this decorator only after locking.ssynchronized
1536
1537   Example::
1538     @locking.ssynchronized(_LOCK)
1539     @_RequireOpenQueue
1540     def Example(self):
1541       pass
1542
1543   """
1544   def wrapper(self, *args, **kwargs):
1545     # pylint: disable=W0212
1546     assert self._queue_filelock is not None, "Queue should be open"
1547     return fn(self, *args, **kwargs)
1548   return wrapper
1549
1550
1551 def _RequireNonDrainedQueue(fn):
1552   """Decorator checking for a non-drained queue.
1553
1554   To be used with functions submitting new jobs.
1555
1556   """
1557   def wrapper(self, *args, **kwargs):
1558     """Wrapper function.
1559
1560     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1561
1562     """
1563     # Ok when sharing the big job queue lock, as the drain file is created when
1564     # the lock is exclusive.
1565     # Needs access to protected member, pylint: disable=W0212
1566     if self._drained:
1567       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1568
1569     if not self._accepting_jobs:
1570       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1571
1572     return fn(self, *args, **kwargs)
1573   return wrapper
1574
1575
1576 class JobQueue(object):
1577   """Queue used to manage the jobs.
1578
1579   """
1580   def __init__(self, context):
1581     """Constructor for JobQueue.
1582
1583     The constructor will initialize the job queue object and then
1584     start loading the current jobs from disk, either for starting them
1585     (if they were queue) or for aborting them (if they were already
1586     running).
1587
1588     @type context: GanetiContext
1589     @param context: the context object for access to the configuration
1590         data and other ganeti objects
1591
1592     """
1593     self.context = context
1594     self._memcache = weakref.WeakValueDictionary()
1595     self._my_hostname = netutils.Hostname.GetSysName()
1596
1597     # The Big JobQueue lock. If a code block or method acquires it in shared
1598     # mode safe it must guarantee concurrency with all the code acquiring it in
1599     # shared mode, including itself. In order not to acquire it at all
1600     # concurrency must be guaranteed with all code acquiring it in shared mode
1601     # and all code acquiring it exclusively.
1602     self._lock = locking.SharedLock("JobQueue")
1603
1604     self.acquire = self._lock.acquire
1605     self.release = self._lock.release
1606
1607     # Accept jobs by default
1608     self._accepting_jobs = True
1609
1610     # Initialize the queue, and acquire the filelock.
1611     # This ensures no other process is working on the job queue.
1612     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1613
1614     # Read serial file
1615     self._last_serial = jstore.ReadSerial()
1616     assert self._last_serial is not None, ("Serial file was modified between"
1617                                            " check in jstore and here")
1618
1619     # Get initial list of nodes
1620     self._nodes = dict((n.name, n.primary_ip)
1621                        for n in self.context.cfg.GetAllNodesInfo().values()
1622                        if n.master_candidate)
1623
1624     # Remove master node
1625     self._nodes.pop(self._my_hostname, None)
1626
1627     # TODO: Check consistency across nodes
1628
1629     self._queue_size = None
1630     self._UpdateQueueSizeUnlocked()
1631     assert ht.TInt(self._queue_size)
1632     self._drained = jstore.CheckDrainFlag()
1633
1634     # Job dependencies
1635     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1636                                         self._EnqueueJobs)
1637     self.context.glm.AddToLockMonitor(self.depmgr)
1638
1639     # Setup worker pool
1640     self._wpool = _JobQueueWorkerPool(self)
1641     try:
1642       self._InspectQueue()
1643     except:
1644       self._wpool.TerminateWorkers()
1645       raise
1646
1647   @locking.ssynchronized(_LOCK)
1648   @_RequireOpenQueue
1649   def _InspectQueue(self):
1650     """Loads the whole job queue and resumes unfinished jobs.
1651
1652     This function needs the lock here because WorkerPool.AddTask() may start a
1653     job while we're still doing our work.
1654
1655     """
1656     logging.info("Inspecting job queue")
1657
1658     restartjobs = []
1659
1660     all_job_ids = self._GetJobIDsUnlocked()
1661     jobs_count = len(all_job_ids)
1662     lastinfo = time.time()
1663     for idx, job_id in enumerate(all_job_ids):
1664       # Give an update every 1000 jobs or 10 seconds
1665       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1666           idx == (jobs_count - 1)):
1667         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1668                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1669         lastinfo = time.time()
1670
1671       job = self._LoadJobUnlocked(job_id)
1672
1673       # a failure in loading the job can cause 'None' to be returned
1674       if job is None:
1675         continue
1676
1677       status = job.CalcStatus()
1678
1679       if status == constants.JOB_STATUS_QUEUED:
1680         restartjobs.append(job)
1681
1682       elif status in (constants.JOB_STATUS_RUNNING,
1683                       constants.JOB_STATUS_WAITING,
1684                       constants.JOB_STATUS_CANCELING):
1685         logging.warning("Unfinished job %s found: %s", job.id, job)
1686
1687         if status == constants.JOB_STATUS_WAITING:
1688           # Restart job
1689           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1690           restartjobs.append(job)
1691         else:
1692           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1693                                 "Unclean master daemon shutdown")
1694           job.Finalize()
1695
1696         self.UpdateJobUnlocked(job)
1697
1698     if restartjobs:
1699       logging.info("Restarting %s jobs", len(restartjobs))
1700       self._EnqueueJobsUnlocked(restartjobs)
1701
1702     logging.info("Job queue inspection finished")
1703
1704   def _GetRpc(self, address_list):
1705     """Gets RPC runner with context.
1706
1707     """
1708     return rpc.JobQueueRunner(self.context, address_list)
1709
1710   @locking.ssynchronized(_LOCK)
1711   @_RequireOpenQueue
1712   def AddNode(self, node):
1713     """Register a new node with the queue.
1714
1715     @type node: L{objects.Node}
1716     @param node: the node object to be added
1717
1718     """
1719     node_name = node.name
1720     assert node_name != self._my_hostname
1721
1722     # Clean queue directory on added node
1723     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1724     msg = result.fail_msg
1725     if msg:
1726       logging.warning("Cannot cleanup queue directory on node %s: %s",
1727                       node_name, msg)
1728
1729     if not node.master_candidate:
1730       # remove if existing, ignoring errors
1731       self._nodes.pop(node_name, None)
1732       # and skip the replication of the job ids
1733       return
1734
1735     # Upload the whole queue excluding archived jobs
1736     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1737
1738     # Upload current serial file
1739     files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1740
1741     # Static address list
1742     addrs = [node.primary_ip]
1743
1744     for file_name in files:
1745       # Read file content
1746       content = utils.ReadFile(file_name)
1747
1748       result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1749                              file_name, content)
1750       msg = result[node_name].fail_msg
1751       if msg:
1752         logging.error("Failed to upload file %s to node %s: %s",
1753                       file_name, node_name, msg)
1754
1755     self._nodes[node_name] = node.primary_ip
1756
1757   @locking.ssynchronized(_LOCK)
1758   @_RequireOpenQueue
1759   def RemoveNode(self, node_name):
1760     """Callback called when removing nodes from the cluster.
1761
1762     @type node_name: str
1763     @param node_name: the name of the node to remove
1764
1765     """
1766     self._nodes.pop(node_name, None)
1767
1768   @staticmethod
1769   def _CheckRpcResult(result, nodes, failmsg):
1770     """Verifies the status of an RPC call.
1771
1772     Since we aim to keep consistency should this node (the current
1773     master) fail, we will log errors if our rpc fail, and especially
1774     log the case when more than half of the nodes fails.
1775
1776     @param result: the data as returned from the rpc call
1777     @type nodes: list
1778     @param nodes: the list of nodes we made the call to
1779     @type failmsg: str
1780     @param failmsg: the identifier to be used for logging
1781
1782     """
1783     failed = []
1784     success = []
1785
1786     for node in nodes:
1787       msg = result[node].fail_msg
1788       if msg:
1789         failed.append(node)
1790         logging.error("RPC call %s (%s) failed on node %s: %s",
1791                       result[node].call, failmsg, node, msg)
1792       else:
1793         success.append(node)
1794
1795     # +1 for the master node
1796     if (len(success) + 1) < len(failed):
1797       # TODO: Handle failing nodes
1798       logging.error("More than half of the nodes failed")
1799
1800   def _GetNodeIp(self):
1801     """Helper for returning the node name/ip list.
1802
1803     @rtype: (list, list)
1804     @return: a tuple of two lists, the first one with the node
1805         names and the second one with the node addresses
1806
1807     """
1808     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1809     name_list = self._nodes.keys()
1810     addr_list = [self._nodes[name] for name in name_list]
1811     return name_list, addr_list
1812
1813   def _UpdateJobQueueFile(self, file_name, data, replicate):
1814     """Writes a file locally and then replicates it to all nodes.
1815
1816     This function will replace the contents of a file on the local
1817     node and then replicate it to all the other nodes we have.
1818
1819     @type file_name: str
1820     @param file_name: the path of the file to be replicated
1821     @type data: str
1822     @param data: the new contents of the file
1823     @type replicate: boolean
1824     @param replicate: whether to spread the changes to the remote nodes
1825
1826     """
1827     getents = runtime.GetEnts()
1828     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1829                     gid=getents.masterd_gid)
1830
1831     if replicate:
1832       names, addrs = self._GetNodeIp()
1833       result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1834       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1835
1836   def _RenameFilesUnlocked(self, rename):
1837     """Renames a file locally and then replicate the change.
1838
1839     This function will rename a file in the local queue directory
1840     and then replicate this rename to all the other nodes we have.
1841
1842     @type rename: list of (old, new)
1843     @param rename: List containing tuples mapping old to new names
1844
1845     """
1846     # Rename them locally
1847     for old, new in rename:
1848       utils.RenameFile(old, new, mkdir=True)
1849
1850     # ... and on all nodes
1851     names, addrs = self._GetNodeIp()
1852     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1853     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1854
1855   def _NewSerialsUnlocked(self, count):
1856     """Generates a new job identifier.
1857
1858     Job identifiers are unique during the lifetime of a cluster.
1859
1860     @type count: integer
1861     @param count: how many serials to return
1862     @rtype: list of int
1863     @return: a list of job identifiers.
1864
1865     """
1866     assert ht.TPositiveInt(count)
1867
1868     # New number
1869     serial = self._last_serial + count
1870
1871     # Write to file
1872     self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1873                              "%s\n" % serial, True)
1874
1875     result = [jstore.FormatJobID(v)
1876               for v in range(self._last_serial + 1, serial + 1)]
1877
1878     # Keep it only if we were able to write the file
1879     self._last_serial = serial
1880
1881     assert len(result) == count
1882
1883     return result
1884
1885   @staticmethod
1886   def _GetJobPath(job_id):
1887     """Returns the job file for a given job id.
1888
1889     @type job_id: str
1890     @param job_id: the job identifier
1891     @rtype: str
1892     @return: the path to the job file
1893
1894     """
1895     return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1896
1897   @staticmethod
1898   def _GetArchivedJobPath(job_id):
1899     """Returns the archived job file for a give job id.
1900
1901     @type job_id: str
1902     @param job_id: the job identifier
1903     @rtype: str
1904     @return: the path to the archived job file
1905
1906     """
1907     return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1908                           jstore.GetArchiveDirectory(job_id),
1909                           "job-%s" % job_id)
1910
1911   @staticmethod
1912   def _DetermineJobDirectories(archived):
1913     """Build list of directories containing job files.
1914
1915     @type archived: bool
1916     @param archived: Whether to include directories for archived jobs
1917     @rtype: list
1918
1919     """
1920     result = [pathutils.QUEUE_DIR]
1921
1922     if archived:
1923       archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1924       result.extend(map(compat.partial(utils.PathJoin, archive_path),
1925                         utils.ListVisibleFiles(archive_path)))
1926
1927     return result
1928
1929   @classmethod
1930   def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1931     """Return all known job IDs.
1932
1933     The method only looks at disk because it's a requirement that all
1934     jobs are present on disk (so in the _memcache we don't have any
1935     extra IDs).
1936
1937     @type sort: boolean
1938     @param sort: perform sorting on the returned job ids
1939     @rtype: list
1940     @return: the list of job IDs
1941
1942     """
1943     jlist = []
1944
1945     for path in cls._DetermineJobDirectories(archived):
1946       for filename in utils.ListVisibleFiles(path):
1947         m = constants.JOB_FILE_RE.match(filename)
1948         if m:
1949           jlist.append(int(m.group(1)))
1950
1951     if sort:
1952       jlist.sort()
1953     return jlist
1954
1955   def _LoadJobUnlocked(self, job_id):
1956     """Loads a job from the disk or memory.
1957
1958     Given a job id, this will return the cached job object if
1959     existing, or try to load the job from the disk. If loading from
1960     disk, it will also add the job to the cache.
1961
1962     @type job_id: int
1963     @param job_id: the job id
1964     @rtype: L{_QueuedJob} or None
1965     @return: either None or the job object
1966
1967     """
1968     job = self._memcache.get(job_id, None)
1969     if job:
1970       logging.debug("Found job %s in memcache", job_id)
1971       assert job.writable, "Found read-only job in memcache"
1972       return job
1973
1974     try:
1975       job = self._LoadJobFromDisk(job_id, False)
1976       if job is None:
1977         return job
1978     except errors.JobFileCorrupted:
1979       old_path = self._GetJobPath(job_id)
1980       new_path = self._GetArchivedJobPath(job_id)
1981       if old_path == new_path:
1982         # job already archived (future case)
1983         logging.exception("Can't parse job %s", job_id)
1984       else:
1985         # non-archived case
1986         logging.exception("Can't parse job %s, will archive.", job_id)
1987         self._RenameFilesUnlocked([(old_path, new_path)])
1988       return None
1989
1990     assert job.writable, "Job just loaded is not writable"
1991
1992     self._memcache[job_id] = job
1993     logging.debug("Added job %s to the cache", job_id)
1994     return job
1995
1996   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1997     """Load the given job file from disk.
1998
1999     Given a job file, read, load and restore it in a _QueuedJob format.
2000
2001     @type job_id: int
2002     @param job_id: job identifier
2003     @type try_archived: bool
2004     @param try_archived: Whether to try loading an archived job
2005     @rtype: L{_QueuedJob} or None
2006     @return: either None or the job object
2007
2008     """
2009     path_functions = [(self._GetJobPath, False)]
2010
2011     if try_archived:
2012       path_functions.append((self._GetArchivedJobPath, True))
2013
2014     raw_data = None
2015     archived = None
2016
2017     for (fn, archived) in path_functions:
2018       filepath = fn(job_id)
2019       logging.debug("Loading job from %s", filepath)
2020       try:
2021         raw_data = utils.ReadFile(filepath)
2022       except EnvironmentError, err:
2023         if err.errno != errno.ENOENT:
2024           raise
2025       else:
2026         break
2027
2028     if not raw_data:
2029       return None
2030
2031     if writable is None:
2032       writable = not archived
2033
2034     try:
2035       data = serializer.LoadJson(raw_data)
2036       job = _QueuedJob.Restore(self, data, writable, archived)
2037     except Exception, err: # pylint: disable=W0703
2038       raise errors.JobFileCorrupted(err)
2039
2040     return job
2041
2042   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2043     """Load the given job file from disk.
2044
2045     Given a job file, read, load and restore it in a _QueuedJob format.
2046     In case of error reading the job, it gets returned as None, and the
2047     exception is logged.
2048
2049     @type job_id: int
2050     @param job_id: job identifier
2051     @type try_archived: bool
2052     @param try_archived: Whether to try loading an archived job
2053     @rtype: L{_QueuedJob} or None
2054     @return: either None or the job object
2055
2056     """
2057     try:
2058       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2059     except (errors.JobFileCorrupted, EnvironmentError):
2060       logging.exception("Can't load/parse job %s", job_id)
2061       return None
2062
2063   def _UpdateQueueSizeUnlocked(self):
2064     """Update the queue size.
2065
2066     """
2067     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2068
2069   @locking.ssynchronized(_LOCK)
2070   @_RequireOpenQueue
2071   def SetDrainFlag(self, drain_flag):
2072     """Sets the drain flag for the queue.
2073
2074     @type drain_flag: boolean
2075     @param drain_flag: Whether to set or unset the drain flag
2076
2077     """
2078     jstore.SetDrainFlag(drain_flag)
2079
2080     self._drained = drain_flag
2081
2082     return True
2083
2084   @_RequireOpenQueue
2085   def _SubmitJobUnlocked(self, job_id, ops):
2086     """Create and store a new job.
2087
2088     This enters the job into our job queue and also puts it on the new
2089     queue, in order for it to be picked up by the queue processors.
2090
2091     @type job_id: job ID
2092     @param job_id: the job ID for the new job
2093     @type ops: list
2094     @param ops: The list of OpCodes that will become the new job.
2095     @rtype: L{_QueuedJob}
2096     @return: the job object to be queued
2097     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2098     @raise errors.GenericError: If an opcode is not valid
2099
2100     """
2101     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2102       raise errors.JobQueueFull()
2103
2104     job = _QueuedJob(self, job_id, ops, True)
2105
2106     for idx, op in enumerate(job.ops):
2107       # Check priority
2108       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2109         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2110         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2111                                   " are %s" % (idx, op.priority, allowed))
2112
2113       # Check job dependencies
2114       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2115       if not opcodes.TNoRelativeJobDependencies(dependencies):
2116         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2117                                   " match %s: %s" %
2118                                   (idx, opcodes.TNoRelativeJobDependencies,
2119                                    dependencies))
2120
2121     # Write to disk
2122     self.UpdateJobUnlocked(job)
2123
2124     self._queue_size += 1
2125
2126     logging.debug("Adding new job %s to the cache", job_id)
2127     self._memcache[job_id] = job
2128
2129     return job
2130
2131   @locking.ssynchronized(_LOCK)
2132   @_RequireOpenQueue
2133   @_RequireNonDrainedQueue
2134   def SubmitJob(self, ops):
2135     """Create and store a new job.
2136
2137     @see: L{_SubmitJobUnlocked}
2138
2139     """
2140     (job_id, ) = self._NewSerialsUnlocked(1)
2141     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2142     return job_id
2143
2144   @locking.ssynchronized(_LOCK)
2145   @_RequireOpenQueue
2146   @_RequireNonDrainedQueue
2147   def SubmitManyJobs(self, jobs):
2148     """Create and store multiple jobs.
2149
2150     @see: L{_SubmitJobUnlocked}
2151
2152     """
2153     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2154
2155     (results, added_jobs) = \
2156       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2157
2158     self._EnqueueJobsUnlocked(added_jobs)
2159
2160     return results
2161
2162   @staticmethod
2163   def _FormatSubmitError(msg, ops):
2164     """Formats errors which occurred while submitting a job.
2165
2166     """
2167     return ("%s; opcodes %s" %
2168             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2169
2170   @staticmethod
2171   def _ResolveJobDependencies(resolve_fn, deps):
2172     """Resolves relative job IDs in dependencies.
2173
2174     @type resolve_fn: callable
2175     @param resolve_fn: Function to resolve a relative job ID
2176     @type deps: list
2177     @param deps: Dependencies
2178     @rtype: tuple; (boolean, string or list)
2179     @return: If successful (first tuple item), the returned list contains
2180       resolved job IDs along with the requested status; if not successful,
2181       the second element is an error message
2182
2183     """
2184     result = []
2185
2186     for (dep_job_id, dep_status) in deps:
2187       if ht.TRelativeJobId(dep_job_id):
2188         assert ht.TInt(dep_job_id) and dep_job_id < 0
2189         try:
2190           job_id = resolve_fn(dep_job_id)
2191         except IndexError:
2192           # Abort
2193           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2194       else:
2195         job_id = dep_job_id
2196
2197       result.append((job_id, dep_status))
2198
2199     return (True, result)
2200
2201   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2202     """Create and store multiple jobs.
2203
2204     @see: L{_SubmitJobUnlocked}
2205
2206     """
2207     results = []
2208     added_jobs = []
2209
2210     def resolve_fn(job_idx, reljobid):
2211       assert reljobid < 0
2212       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2213
2214     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2215       for op in ops:
2216         if getattr(op, opcodes.DEPEND_ATTR, None):
2217           (status, data) = \
2218             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2219                                          op.depends)
2220           if not status:
2221             # Abort resolving dependencies
2222             assert ht.TNonEmptyString(data), "No error message"
2223             break
2224           # Use resolved dependencies
2225           op.depends = data
2226       else:
2227         try:
2228           job = self._SubmitJobUnlocked(job_id, ops)
2229         except errors.GenericError, err:
2230           status = False
2231           data = self._FormatSubmitError(str(err), ops)
2232         else:
2233           status = True
2234           data = job_id
2235           added_jobs.append(job)
2236
2237       results.append((status, data))
2238
2239     return (results, added_jobs)
2240
2241   @locking.ssynchronized(_LOCK)
2242   def _EnqueueJobs(self, jobs):
2243     """Helper function to add jobs to worker pool's queue.
2244
2245     @type jobs: list
2246     @param jobs: List of all jobs
2247
2248     """
2249     return self._EnqueueJobsUnlocked(jobs)
2250
2251   def _EnqueueJobsUnlocked(self, jobs):
2252     """Helper function to add jobs to worker pool's queue.
2253
2254     @type jobs: list
2255     @param jobs: List of all jobs
2256
2257     """
2258     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2259     self._wpool.AddManyTasks([(job, ) for job in jobs],
2260                              priority=[job.CalcPriority() for job in jobs])
2261
2262   def _GetJobStatusForDependencies(self, job_id):
2263     """Gets the status of a job for dependencies.
2264
2265     @type job_id: int
2266     @param job_id: Job ID
2267     @raise errors.JobLost: If job can't be found
2268
2269     """
2270     # Not using in-memory cache as doing so would require an exclusive lock
2271
2272     # Try to load from disk
2273     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2274
2275     assert not job.writable, "Got writable job" # pylint: disable=E1101
2276
2277     if job:
2278       return job.CalcStatus()
2279
2280     raise errors.JobLost("Job %s not found" % job_id)
2281
2282   @_RequireOpenQueue
2283   def UpdateJobUnlocked(self, job, replicate=True):
2284     """Update a job's on disk storage.
2285
2286     After a job has been modified, this function needs to be called in
2287     order to write the changes to disk and replicate them to the other
2288     nodes.
2289
2290     @type job: L{_QueuedJob}
2291     @param job: the changed job
2292     @type replicate: boolean
2293     @param replicate: whether to replicate the change to remote nodes
2294
2295     """
2296     if __debug__:
2297       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2298       assert (finalized ^ (job.end_timestamp is None))
2299       assert job.writable, "Can't update read-only job"
2300       assert not job.archived, "Can't update archived job"
2301
2302     filename = self._GetJobPath(job.id)
2303     data = serializer.DumpJson(job.Serialize())
2304     logging.debug("Writing job %s to %s", job.id, filename)
2305     self._UpdateJobQueueFile(filename, data, replicate)
2306
2307   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2308                         timeout):
2309     """Waits for changes in a job.
2310
2311     @type job_id: int
2312     @param job_id: Job identifier
2313     @type fields: list of strings
2314     @param fields: Which fields to check for changes
2315     @type prev_job_info: list or None
2316     @param prev_job_info: Last job information returned
2317     @type prev_log_serial: int
2318     @param prev_log_serial: Last job message serial number
2319     @type timeout: float
2320     @param timeout: maximum time to wait in seconds
2321     @rtype: tuple (job info, log entries)
2322     @return: a tuple of the job information as required via
2323         the fields parameter, and the log entries as a list
2324
2325         if the job has not changed and the timeout has expired,
2326         we instead return a special value,
2327         L{constants.JOB_NOTCHANGED}, which should be interpreted
2328         as such by the clients
2329
2330     """
2331     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2332                              writable=False)
2333
2334     helper = _WaitForJobChangesHelper()
2335
2336     return helper(self._GetJobPath(job_id), load_fn,
2337                   fields, prev_job_info, prev_log_serial, timeout)
2338
2339   @locking.ssynchronized(_LOCK)
2340   @_RequireOpenQueue
2341   def CancelJob(self, job_id):
2342     """Cancels a job.
2343
2344     This will only succeed if the job has not started yet.
2345
2346     @type job_id: int
2347     @param job_id: job ID of job to be cancelled.
2348
2349     """
2350     logging.info("Cancelling job %s", job_id)
2351
2352     return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2353
2354   def _ModifyJobUnlocked(self, job_id, mod_fn):
2355     """Modifies a job.
2356
2357     @type job_id: int
2358     @param job_id: Job ID
2359     @type mod_fn: callable
2360     @param mod_fn: Modifying function, receiving job object as parameter,
2361       returning tuple of (status boolean, message string)
2362
2363     """
2364     job = self._LoadJobUnlocked(job_id)
2365     if not job:
2366       logging.debug("Job %s not found", job_id)
2367       return (False, "Job %s not found" % job_id)
2368
2369     assert job.writable, "Can't modify read-only job"
2370     assert not job.archived, "Can't modify archived job"
2371
2372     (success, msg) = mod_fn(job)
2373
2374     if success:
2375       # If the job was finalized (e.g. cancelled), this is the final write
2376       # allowed. The job can be archived anytime.
2377       self.UpdateJobUnlocked(job)
2378
2379     return (success, msg)
2380
2381   @_RequireOpenQueue
2382   def _ArchiveJobsUnlocked(self, jobs):
2383     """Archives jobs.
2384
2385     @type jobs: list of L{_QueuedJob}
2386     @param jobs: Job objects
2387     @rtype: int
2388     @return: Number of archived jobs
2389
2390     """
2391     archive_jobs = []
2392     rename_files = []
2393     for job in jobs:
2394       assert job.writable, "Can't archive read-only job"
2395       assert not job.archived, "Can't cancel archived job"
2396
2397       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2398         logging.debug("Job %s is not yet done", job.id)
2399         continue
2400
2401       archive_jobs.append(job)
2402
2403       old = self._GetJobPath(job.id)
2404       new = self._GetArchivedJobPath(job.id)
2405       rename_files.append((old, new))
2406
2407     # TODO: What if 1..n files fail to rename?
2408     self._RenameFilesUnlocked(rename_files)
2409
2410     logging.debug("Successfully archived job(s) %s",
2411                   utils.CommaJoin(job.id for job in archive_jobs))
2412
2413     # Since we haven't quite checked, above, if we succeeded or failed renaming
2414     # the files, we update the cached queue size from the filesystem. When we
2415     # get around to fix the TODO: above, we can use the number of actually
2416     # archived jobs to fix this.
2417     self._UpdateQueueSizeUnlocked()
2418     return len(archive_jobs)
2419
2420   @locking.ssynchronized(_LOCK)
2421   @_RequireOpenQueue
2422   def ArchiveJob(self, job_id):
2423     """Archives a job.
2424
2425     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2426
2427     @type job_id: int
2428     @param job_id: Job ID of job to be archived.
2429     @rtype: bool
2430     @return: Whether job was archived
2431
2432     """
2433     logging.info("Archiving job %s", job_id)
2434
2435     job = self._LoadJobUnlocked(job_id)
2436     if not job:
2437       logging.debug("Job %s not found", job_id)
2438       return False
2439
2440     return self._ArchiveJobsUnlocked([job]) == 1
2441
2442   @locking.ssynchronized(_LOCK)
2443   @_RequireOpenQueue
2444   def AutoArchiveJobs(self, age, timeout):
2445     """Archives all jobs based on age.
2446
2447     The method will archive all jobs which are older than the age
2448     parameter. For jobs that don't have an end timestamp, the start
2449     timestamp will be considered. The special '-1' age will cause
2450     archival of all jobs (that are not running or queued).
2451
2452     @type age: int
2453     @param age: the minimum age in seconds
2454
2455     """
2456     logging.info("Archiving jobs with age more than %s seconds", age)
2457
2458     now = time.time()
2459     end_time = now + timeout
2460     archived_count = 0
2461     last_touched = 0
2462
2463     all_job_ids = self._GetJobIDsUnlocked()
2464     pending = []
2465     for idx, job_id in enumerate(all_job_ids):
2466       last_touched = idx + 1
2467
2468       # Not optimal because jobs could be pending
2469       # TODO: Measure average duration for job archival and take number of
2470       # pending jobs into account.
2471       if time.time() > end_time:
2472         break
2473
2474       # Returns None if the job failed to load
2475       job = self._LoadJobUnlocked(job_id)
2476       if job:
2477         if job.end_timestamp is None:
2478           if job.start_timestamp is None:
2479             job_age = job.received_timestamp
2480           else:
2481             job_age = job.start_timestamp
2482         else:
2483           job_age = job.end_timestamp
2484
2485         if age == -1 or now - job_age[0] > age:
2486           pending.append(job)
2487
2488           # Archive 10 jobs at a time
2489           if len(pending) >= 10:
2490             archived_count += self._ArchiveJobsUnlocked(pending)
2491             pending = []
2492
2493     if pending:
2494       archived_count += self._ArchiveJobsUnlocked(pending)
2495
2496     return (archived_count, len(all_job_ids) - last_touched)
2497
2498   def _Query(self, fields, qfilter):
2499     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2500                        namefield="id")
2501
2502     # Archived jobs are only looked at if the "archived" field is referenced
2503     # either as a requested field or in the filter. By default archived jobs
2504     # are ignored.
2505     include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2506
2507     job_ids = qobj.RequestedNames()
2508
2509     list_all = (job_ids is None)
2510
2511     if list_all:
2512       # Since files are added to/removed from the queue atomically, there's no
2513       # risk of getting the job ids in an inconsistent state.
2514       job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2515
2516     jobs = []
2517
2518     for job_id in job_ids:
2519       job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2520       if job is not None or not list_all:
2521         jobs.append((job_id, job))
2522
2523     return (qobj, jobs, list_all)
2524
2525   def QueryJobs(self, fields, qfilter):
2526     """Returns a list of jobs in queue.
2527
2528     @type fields: sequence
2529     @param fields: List of wanted fields
2530     @type qfilter: None or query2 filter (list)
2531     @param qfilter: Query filter
2532
2533     """
2534     (qobj, ctx, _) = self._Query(fields, qfilter)
2535
2536     return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2537
2538   def OldStyleQueryJobs(self, job_ids, fields):
2539     """Returns a list of jobs in queue.
2540
2541     @type job_ids: list
2542     @param job_ids: sequence of job identifiers or None for all
2543     @type fields: list
2544     @param fields: names of fields to return
2545     @rtype: list
2546     @return: list one element per job, each element being list with
2547         the requested fields
2548
2549     """
2550     # backwards compat:
2551     job_ids = [int(jid) for jid in job_ids]
2552     qfilter = qlang.MakeSimpleFilter("id", job_ids)
2553
2554     (qobj, ctx, _) = self._Query(fields, qfilter)
2555
2556     return qobj.OldStyleQuery(ctx, sort_by_name=False)
2557
2558   @locking.ssynchronized(_LOCK)
2559   def PrepareShutdown(self):
2560     """Prepare to stop the job queue.
2561
2562     Disables execution of jobs in the workerpool and returns whether there are
2563     any jobs currently running. If the latter is the case, the job queue is not
2564     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2565     be called without interfering with any job. Queued and unfinished jobs will
2566     be resumed next time.
2567
2568     Once this function has been called no new job submissions will be accepted
2569     (see L{_RequireNonDrainedQueue}).
2570
2571     @rtype: bool
2572     @return: Whether there are any running jobs
2573
2574     """
2575     if self._accepting_jobs:
2576       self._accepting_jobs = False
2577
2578       # Tell worker pool to stop processing pending tasks
2579       self._wpool.SetActive(False)
2580
2581     return self._wpool.HasRunningTasks()
2582
2583   def AcceptingJobsUnlocked(self):
2584     """Returns whether jobs are accepted.
2585
2586     Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2587     queue is shutting down.
2588
2589     @rtype: bool
2590
2591     """
2592     return self._accepting_jobs
2593
2594   @locking.ssynchronized(_LOCK)
2595   @_RequireOpenQueue
2596   def Shutdown(self):
2597     """Stops the job queue.
2598
2599     This shutdowns all the worker threads an closes the queue.
2600
2601     """
2602     self._wpool.TerminateWorkers()
2603
2604     self._queue_filelock.Close()
2605     self._queue_filelock = None