NEWS: Fix release date for 2.7.0 beta1
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38 import operator
39
40 try:
41   # pylint: disable=E0611
42   from pyinotify import pyinotify
43 except ImportError:
44   import pyinotify
45
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import errors
53 from ganeti import mcpu
54 from ganeti import utils
55 from ganeti import jstore
56 from ganeti import rpc
57 from ganeti import runtime
58 from ganeti import netutils
59 from ganeti import compat
60 from ganeti import ht
61 from ganeti import query
62 from ganeti import qlang
63 from ganeti import pathutils
64 from ganeti import vcluster
65
66
67 JOBQUEUE_THREADS = 25
68
69 # member lock names to be passed to @ssynchronized decorator
70 _LOCK = "_lock"
71 _QUEUE = "_queue"
72
73 #: Retrieves "id" attribute
74 _GetIdAttr = operator.attrgetter("id")
75
76
77 class CancelJob(Exception):
78   """Special exception to cancel a job.
79
80   """
81
82
83 class QueueShutdown(Exception):
84   """Special exception to abort a job when the job queue is shutting down.
85
86   """
87
88
89 def TimeStampNow():
90   """Returns the current timestamp.
91
92   @rtype: tuple
93   @return: the current time in the (seconds, microseconds) format
94
95   """
96   return utils.SplitTime(time.time())
97
98
99 def _CallJqUpdate(runner, names, file_name, content):
100   """Updates job queue file after virtualizing filename.
101
102   """
103   virt_file_name = vcluster.MakeVirtualPath(file_name)
104   return runner.call_jobqueue_update(names, virt_file_name, content)
105
106
107 class _SimpleJobQuery:
108   """Wrapper for job queries.
109
110   Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
111
112   """
113   def __init__(self, fields):
114     """Initializes this class.
115
116     """
117     self._query = query.Query(query.JOB_FIELDS, fields)
118
119   def __call__(self, job):
120     """Executes a job query using cached field list.
121
122     """
123     return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
124
125
126 class _QueuedOpCode(object):
127   """Encapsulates an opcode object.
128
129   @ivar log: holds the execution log and consists of tuples
130   of the form C{(log_serial, timestamp, level, message)}
131   @ivar input: the OpCode we encapsulate
132   @ivar status: the current status
133   @ivar result: the result of the LU execution
134   @ivar start_timestamp: timestamp for the start of the execution
135   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
136   @ivar stop_timestamp: timestamp for the end of the execution
137
138   """
139   __slots__ = ["input", "status", "result", "log", "priority",
140                "start_timestamp", "exec_timestamp", "end_timestamp",
141                "__weakref__"]
142
143   def __init__(self, op):
144     """Initializes instances of this class.
145
146     @type op: L{opcodes.OpCode}
147     @param op: the opcode we encapsulate
148
149     """
150     self.input = op
151     self.status = constants.OP_STATUS_QUEUED
152     self.result = None
153     self.log = []
154     self.start_timestamp = None
155     self.exec_timestamp = None
156     self.end_timestamp = None
157
158     # Get initial priority (it might change during the lifetime of this opcode)
159     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
160
161   @classmethod
162   def Restore(cls, state):
163     """Restore the _QueuedOpCode from the serialized form.
164
165     @type state: dict
166     @param state: the serialized state
167     @rtype: _QueuedOpCode
168     @return: a new _QueuedOpCode instance
169
170     """
171     obj = _QueuedOpCode.__new__(cls)
172     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
173     obj.status = state["status"]
174     obj.result = state["result"]
175     obj.log = state["log"]
176     obj.start_timestamp = state.get("start_timestamp", None)
177     obj.exec_timestamp = state.get("exec_timestamp", None)
178     obj.end_timestamp = state.get("end_timestamp", None)
179     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
180     return obj
181
182   def Serialize(self):
183     """Serializes this _QueuedOpCode.
184
185     @rtype: dict
186     @return: the dictionary holding the serialized state
187
188     """
189     return {
190       "input": self.input.__getstate__(),
191       "status": self.status,
192       "result": self.result,
193       "log": self.log,
194       "start_timestamp": self.start_timestamp,
195       "exec_timestamp": self.exec_timestamp,
196       "end_timestamp": self.end_timestamp,
197       "priority": self.priority,
198       }
199
200
201 class _QueuedJob(object):
202   """In-memory job representation.
203
204   This is what we use to track the user-submitted jobs. Locking must
205   be taken care of by users of this class.
206
207   @type queue: L{JobQueue}
208   @ivar queue: the parent queue
209   @ivar id: the job ID
210   @type ops: list
211   @ivar ops: the list of _QueuedOpCode that constitute the job
212   @type log_serial: int
213   @ivar log_serial: holds the index for the next log entry
214   @ivar received_timestamp: the timestamp for when the job was received
215   @ivar start_timestmap: the timestamp for start of execution
216   @ivar end_timestamp: the timestamp for end of execution
217   @ivar writable: Whether the job is allowed to be modified
218
219   """
220   # pylint: disable=W0212
221   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
222                "received_timestamp", "start_timestamp", "end_timestamp",
223                "__weakref__", "processor_lock", "writable", "archived"]
224
225   def __init__(self, queue, job_id, ops, writable):
226     """Constructor for the _QueuedJob.
227
228     @type queue: L{JobQueue}
229     @param queue: our parent queue
230     @type job_id: job_id
231     @param job_id: our job id
232     @type ops: list
233     @param ops: the list of opcodes we hold, which will be encapsulated
234         in _QueuedOpCodes
235     @type writable: bool
236     @param writable: Whether job can be modified
237
238     """
239     if not ops:
240       raise errors.GenericError("A job needs at least one opcode")
241
242     self.queue = queue
243     self.id = int(job_id)
244     self.ops = [_QueuedOpCode(op) for op in ops]
245     self.log_serial = 0
246     self.received_timestamp = TimeStampNow()
247     self.start_timestamp = None
248     self.end_timestamp = None
249     self.archived = False
250
251     self._InitInMemory(self, writable)
252
253     assert not self.archived, "New jobs can not be marked as archived"
254
255   @staticmethod
256   def _InitInMemory(obj, writable):
257     """Initializes in-memory variables.
258
259     """
260     obj.writable = writable
261     obj.ops_iter = None
262     obj.cur_opctx = None
263
264     # Read-only jobs are not processed and therefore don't need a lock
265     if writable:
266       obj.processor_lock = threading.Lock()
267     else:
268       obj.processor_lock = None
269
270   def __repr__(self):
271     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
272               "id=%s" % self.id,
273               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
274
275     return "<%s at %#x>" % (" ".join(status), id(self))
276
277   @classmethod
278   def Restore(cls, queue, state, writable, archived):
279     """Restore a _QueuedJob from serialized state:
280
281     @type queue: L{JobQueue}
282     @param queue: to which queue the restored job belongs
283     @type state: dict
284     @param state: the serialized state
285     @type writable: bool
286     @param writable: Whether job can be modified
287     @type archived: bool
288     @param archived: Whether job was already archived
289     @rtype: _JobQueue
290     @return: the restored _JobQueue instance
291
292     """
293     obj = _QueuedJob.__new__(cls)
294     obj.queue = queue
295     obj.id = int(state["id"])
296     obj.received_timestamp = state.get("received_timestamp", None)
297     obj.start_timestamp = state.get("start_timestamp", None)
298     obj.end_timestamp = state.get("end_timestamp", None)
299     obj.archived = archived
300
301     obj.ops = []
302     obj.log_serial = 0
303     for op_state in state["ops"]:
304       op = _QueuedOpCode.Restore(op_state)
305       for log_entry in op.log:
306         obj.log_serial = max(obj.log_serial, log_entry[0])
307       obj.ops.append(op)
308
309     cls._InitInMemory(obj, writable)
310
311     return obj
312
313   def Serialize(self):
314     """Serialize the _JobQueue instance.
315
316     @rtype: dict
317     @return: the serialized state
318
319     """
320     return {
321       "id": self.id,
322       "ops": [op.Serialize() for op in self.ops],
323       "start_timestamp": self.start_timestamp,
324       "end_timestamp": self.end_timestamp,
325       "received_timestamp": self.received_timestamp,
326       }
327
328   def CalcStatus(self):
329     """Compute the status of this job.
330
331     This function iterates over all the _QueuedOpCodes in the job and
332     based on their status, computes the job status.
333
334     The algorithm is:
335       - if we find a cancelled, or finished with error, the job
336         status will be the same
337       - otherwise, the last opcode with the status one of:
338           - waitlock
339           - canceling
340           - running
341
342         will determine the job status
343
344       - otherwise, it means either all opcodes are queued, or success,
345         and the job status will be the same
346
347     @return: the job status
348
349     """
350     status = constants.JOB_STATUS_QUEUED
351
352     all_success = True
353     for op in self.ops:
354       if op.status == constants.OP_STATUS_SUCCESS:
355         continue
356
357       all_success = False
358
359       if op.status == constants.OP_STATUS_QUEUED:
360         pass
361       elif op.status == constants.OP_STATUS_WAITING:
362         status = constants.JOB_STATUS_WAITING
363       elif op.status == constants.OP_STATUS_RUNNING:
364         status = constants.JOB_STATUS_RUNNING
365       elif op.status == constants.OP_STATUS_CANCELING:
366         status = constants.JOB_STATUS_CANCELING
367         break
368       elif op.status == constants.OP_STATUS_ERROR:
369         status = constants.JOB_STATUS_ERROR
370         # The whole job fails if one opcode failed
371         break
372       elif op.status == constants.OP_STATUS_CANCELED:
373         status = constants.OP_STATUS_CANCELED
374         break
375
376     if all_success:
377       status = constants.JOB_STATUS_SUCCESS
378
379     return status
380
381   def CalcPriority(self):
382     """Gets the current priority for this job.
383
384     Only unfinished opcodes are considered. When all are done, the default
385     priority is used.
386
387     @rtype: int
388
389     """
390     priorities = [op.priority for op in self.ops
391                   if op.status not in constants.OPS_FINALIZED]
392
393     if not priorities:
394       # All opcodes are done, assume default priority
395       return constants.OP_PRIO_DEFAULT
396
397     return min(priorities)
398
399   def GetLogEntries(self, newer_than):
400     """Selectively returns the log entries.
401
402     @type newer_than: None or int
403     @param newer_than: if this is None, return all log entries,
404         otherwise return only the log entries with serial higher
405         than this value
406     @rtype: list
407     @return: the list of the log entries selected
408
409     """
410     if newer_than is None:
411       serial = -1
412     else:
413       serial = newer_than
414
415     entries = []
416     for op in self.ops:
417       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
418
419     return entries
420
421   def GetInfo(self, fields):
422     """Returns information about a job.
423
424     @type fields: list
425     @param fields: names of fields to return
426     @rtype: list
427     @return: list with one element for each field
428     @raise errors.OpExecError: when an invalid field
429         has been passed
430
431     """
432     return _SimpleJobQuery(fields)(self)
433
434   def MarkUnfinishedOps(self, status, result):
435     """Mark unfinished opcodes with a given status and result.
436
437     This is an utility function for marking all running or waiting to
438     be run opcodes with a given status. Opcodes which are already
439     finalised are not changed.
440
441     @param status: a given opcode status
442     @param result: the opcode result
443
444     """
445     not_marked = True
446     for op in self.ops:
447       if op.status in constants.OPS_FINALIZED:
448         assert not_marked, "Finalized opcodes found after non-finalized ones"
449         continue
450       op.status = status
451       op.result = result
452       not_marked = False
453
454   def Finalize(self):
455     """Marks the job as finalized.
456
457     """
458     self.end_timestamp = TimeStampNow()
459
460   def Cancel(self):
461     """Marks job as canceled/-ing if possible.
462
463     @rtype: tuple; (bool, string)
464     @return: Boolean describing whether job was successfully canceled or marked
465       as canceling and a text message
466
467     """
468     status = self.CalcStatus()
469
470     if status == constants.JOB_STATUS_QUEUED:
471       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
472                              "Job canceled by request")
473       self.Finalize()
474       return (True, "Job %s canceled" % self.id)
475
476     elif status == constants.JOB_STATUS_WAITING:
477       # The worker will notice the new status and cancel the job
478       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
479       return (True, "Job %s will be canceled" % self.id)
480
481     else:
482       logging.debug("Job %s is no longer waiting in the queue", self.id)
483       return (False, "Job %s is no longer waiting in the queue" % self.id)
484
485   def ChangePriority(self, priority):
486     """Changes the job priority.
487
488     @type priority: int
489     @param priority: New priority
490     @rtype: tuple; (bool, string)
491     @return: Boolean describing whether job's priority was successfully changed
492       and a text message
493
494     """
495     status = self.CalcStatus()
496
497     if status in constants.JOBS_FINALIZED:
498       return (False, "Job %s is finished" % self.id)
499     elif status == constants.JOB_STATUS_CANCELING:
500       return (False, "Job %s is cancelling" % self.id)
501     else:
502       assert status in (constants.JOB_STATUS_QUEUED,
503                         constants.JOB_STATUS_WAITING,
504                         constants.JOB_STATUS_RUNNING)
505
506       changed = False
507       for op in self.ops:
508         if (op.status == constants.OP_STATUS_RUNNING or
509             op.status in constants.OPS_FINALIZED):
510           assert not changed, \
511             ("Found opcode for which priority should not be changed after"
512              " priority has been changed for previous opcodes")
513           continue
514
515         assert op.status in (constants.OP_STATUS_QUEUED,
516                              constants.OP_STATUS_WAITING)
517
518         changed = True
519
520         # Set new priority (doesn't modify opcode input)
521         op.priority = priority
522
523       if changed:
524         return (True, ("Priorities of pending opcodes for job %s have been"
525                        " changed to %s" % (self.id, priority)))
526       else:
527         return (False, "Job %s had no pending opcodes" % self.id)
528
529
530 class _OpExecCallbacks(mcpu.OpExecCbBase):
531   def __init__(self, queue, job, op):
532     """Initializes this class.
533
534     @type queue: L{JobQueue}
535     @param queue: Job queue
536     @type job: L{_QueuedJob}
537     @param job: Job object
538     @type op: L{_QueuedOpCode}
539     @param op: OpCode
540
541     """
542     assert queue, "Queue is missing"
543     assert job, "Job is missing"
544     assert op, "Opcode is missing"
545
546     self._queue = queue
547     self._job = job
548     self._op = op
549
550   def _CheckCancel(self):
551     """Raises an exception to cancel the job if asked to.
552
553     """
554     # Cancel here if we were asked to
555     if self._op.status == constants.OP_STATUS_CANCELING:
556       logging.debug("Canceling opcode")
557       raise CancelJob()
558
559     # See if queue is shutting down
560     if not self._queue.AcceptingJobsUnlocked():
561       logging.debug("Queue is shutting down")
562       raise QueueShutdown()
563
564   @locking.ssynchronized(_QUEUE, shared=1)
565   def NotifyStart(self):
566     """Mark the opcode as running, not lock-waiting.
567
568     This is called from the mcpu code as a notifier function, when the LU is
569     finally about to start the Exec() method. Of course, to have end-user
570     visible results, the opcode must be initially (before calling into
571     Processor.ExecOpCode) set to OP_STATUS_WAITING.
572
573     """
574     assert self._op in self._job.ops
575     assert self._op.status in (constants.OP_STATUS_WAITING,
576                                constants.OP_STATUS_CANCELING)
577
578     # Cancel here if we were asked to
579     self._CheckCancel()
580
581     logging.debug("Opcode is now running")
582
583     self._op.status = constants.OP_STATUS_RUNNING
584     self._op.exec_timestamp = TimeStampNow()
585
586     # And finally replicate the job status
587     self._queue.UpdateJobUnlocked(self._job)
588
589   @locking.ssynchronized(_QUEUE, shared=1)
590   def _AppendFeedback(self, timestamp, log_type, log_msg):
591     """Internal feedback append function, with locks
592
593     """
594     self._job.log_serial += 1
595     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
596     self._queue.UpdateJobUnlocked(self._job, replicate=False)
597
598   def Feedback(self, *args):
599     """Append a log entry.
600
601     """
602     assert len(args) < 3
603
604     if len(args) == 1:
605       log_type = constants.ELOG_MESSAGE
606       log_msg = args[0]
607     else:
608       (log_type, log_msg) = args
609
610     # The time is split to make serialization easier and not lose
611     # precision.
612     timestamp = utils.SplitTime(time.time())
613     self._AppendFeedback(timestamp, log_type, log_msg)
614
615   def CurrentPriority(self):
616     """Returns current priority for opcode.
617
618     """
619     assert self._op.status in (constants.OP_STATUS_WAITING,
620                                constants.OP_STATUS_CANCELING)
621
622     # Cancel here if we were asked to
623     self._CheckCancel()
624
625     return self._op.priority
626
627   def SubmitManyJobs(self, jobs):
628     """Submits jobs for processing.
629
630     See L{JobQueue.SubmitManyJobs}.
631
632     """
633     # Locking is done in job queue
634     return self._queue.SubmitManyJobs(jobs)
635
636
637 class _JobChangesChecker(object):
638   def __init__(self, fields, prev_job_info, prev_log_serial):
639     """Initializes this class.
640
641     @type fields: list of strings
642     @param fields: Fields requested by LUXI client
643     @type prev_job_info: string
644     @param prev_job_info: previous job info, as passed by the LUXI client
645     @type prev_log_serial: string
646     @param prev_log_serial: previous job serial, as passed by the LUXI client
647
648     """
649     self._squery = _SimpleJobQuery(fields)
650     self._prev_job_info = prev_job_info
651     self._prev_log_serial = prev_log_serial
652
653   def __call__(self, job):
654     """Checks whether job has changed.
655
656     @type job: L{_QueuedJob}
657     @param job: Job object
658
659     """
660     assert not job.writable, "Expected read-only job"
661
662     status = job.CalcStatus()
663     job_info = self._squery(job)
664     log_entries = job.GetLogEntries(self._prev_log_serial)
665
666     # Serializing and deserializing data can cause type changes (e.g. from
667     # tuple to list) or precision loss. We're doing it here so that we get
668     # the same modifications as the data received from the client. Without
669     # this, the comparison afterwards might fail without the data being
670     # significantly different.
671     # TODO: we just deserialized from disk, investigate how to make sure that
672     # the job info and log entries are compatible to avoid this further step.
673     # TODO: Doing something like in testutils.py:UnifyValueType might be more
674     # efficient, though floats will be tricky
675     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
676     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
677
678     # Don't even try to wait if the job is no longer running, there will be
679     # no changes.
680     if (status not in (constants.JOB_STATUS_QUEUED,
681                        constants.JOB_STATUS_RUNNING,
682                        constants.JOB_STATUS_WAITING) or
683         job_info != self._prev_job_info or
684         (log_entries and self._prev_log_serial != log_entries[0][0])):
685       logging.debug("Job %s changed", job.id)
686       return (job_info, log_entries)
687
688     return None
689
690
691 class _JobFileChangesWaiter(object):
692   def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
693     """Initializes this class.
694
695     @type filename: string
696     @param filename: Path to job file
697     @raises errors.InotifyError: if the notifier cannot be setup
698
699     """
700     self._wm = _inotify_wm_cls()
701     self._inotify_handler = \
702       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
703     self._notifier = \
704       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
705     try:
706       self._inotify_handler.enable()
707     except Exception:
708       # pyinotify doesn't close file descriptors automatically
709       self._notifier.stop()
710       raise
711
712   def _OnInotify(self, notifier_enabled):
713     """Callback for inotify.
714
715     """
716     if not notifier_enabled:
717       self._inotify_handler.enable()
718
719   def Wait(self, timeout):
720     """Waits for the job file to change.
721
722     @type timeout: float
723     @param timeout: Timeout in seconds
724     @return: Whether there have been events
725
726     """
727     assert timeout >= 0
728     have_events = self._notifier.check_events(timeout * 1000)
729     if have_events:
730       self._notifier.read_events()
731     self._notifier.process_events()
732     return have_events
733
734   def Close(self):
735     """Closes underlying notifier and its file descriptor.
736
737     """
738     self._notifier.stop()
739
740
741 class _JobChangesWaiter(object):
742   def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
743     """Initializes this class.
744
745     @type filename: string
746     @param filename: Path to job file
747
748     """
749     self._filewaiter = None
750     self._filename = filename
751     self._waiter_cls = _waiter_cls
752
753   def Wait(self, timeout):
754     """Waits for a job to change.
755
756     @type timeout: float
757     @param timeout: Timeout in seconds
758     @return: Whether there have been events
759
760     """
761     if self._filewaiter:
762       return self._filewaiter.Wait(timeout)
763
764     # Lazy setup: Avoid inotify setup cost when job file has already changed.
765     # If this point is reached, return immediately and let caller check the job
766     # file again in case there were changes since the last check. This avoids a
767     # race condition.
768     self._filewaiter = self._waiter_cls(self._filename)
769
770     return True
771
772   def Close(self):
773     """Closes underlying waiter.
774
775     """
776     if self._filewaiter:
777       self._filewaiter.Close()
778
779
780 class _WaitForJobChangesHelper(object):
781   """Helper class using inotify to wait for changes in a job file.
782
783   This class takes a previous job status and serial, and alerts the client when
784   the current job status has changed.
785
786   """
787   @staticmethod
788   def _CheckForChanges(counter, job_load_fn, check_fn):
789     if counter.next() > 0:
790       # If this isn't the first check the job is given some more time to change
791       # again. This gives better performance for jobs generating many
792       # changes/messages.
793       time.sleep(0.1)
794
795     job = job_load_fn()
796     if not job:
797       raise errors.JobLost()
798
799     result = check_fn(job)
800     if result is None:
801       raise utils.RetryAgain()
802
803     return result
804
805   def __call__(self, filename, job_load_fn,
806                fields, prev_job_info, prev_log_serial, timeout,
807                _waiter_cls=_JobChangesWaiter):
808     """Waits for changes on a job.
809
810     @type filename: string
811     @param filename: File on which to wait for changes
812     @type job_load_fn: callable
813     @param job_load_fn: Function to load job
814     @type fields: list of strings
815     @param fields: Which fields to check for changes
816     @type prev_job_info: list or None
817     @param prev_job_info: Last job information returned
818     @type prev_log_serial: int
819     @param prev_log_serial: Last job message serial number
820     @type timeout: float
821     @param timeout: maximum time to wait in seconds
822
823     """
824     counter = itertools.count()
825     try:
826       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
827       waiter = _waiter_cls(filename)
828       try:
829         return utils.Retry(compat.partial(self._CheckForChanges,
830                                           counter, job_load_fn, check_fn),
831                            utils.RETRY_REMAINING_TIME, timeout,
832                            wait_fn=waiter.Wait)
833       finally:
834         waiter.Close()
835     except errors.JobLost:
836       return None
837     except utils.RetryTimeout:
838       return constants.JOB_NOTCHANGED
839
840
841 def _EncodeOpError(err):
842   """Encodes an error which occurred while processing an opcode.
843
844   """
845   if isinstance(err, errors.GenericError):
846     to_encode = err
847   else:
848     to_encode = errors.OpExecError(str(err))
849
850   return errors.EncodeException(to_encode)
851
852
853 class _TimeoutStrategyWrapper:
854   def __init__(self, fn):
855     """Initializes this class.
856
857     """
858     self._fn = fn
859     self._next = None
860
861   def _Advance(self):
862     """Gets the next timeout if necessary.
863
864     """
865     if self._next is None:
866       self._next = self._fn()
867
868   def Peek(self):
869     """Returns the next timeout.
870
871     """
872     self._Advance()
873     return self._next
874
875   def Next(self):
876     """Returns the current timeout and advances the internal state.
877
878     """
879     self._Advance()
880     result = self._next
881     self._next = None
882     return result
883
884
885 class _OpExecContext:
886   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
887     """Initializes this class.
888
889     """
890     self.op = op
891     self.index = index
892     self.log_prefix = log_prefix
893     self.summary = op.input.Summary()
894
895     # Create local copy to modify
896     if getattr(op.input, opcodes.DEPEND_ATTR, None):
897       self.jobdeps = op.input.depends[:]
898     else:
899       self.jobdeps = None
900
901     self._timeout_strategy_factory = timeout_strategy_factory
902     self._ResetTimeoutStrategy()
903
904   def _ResetTimeoutStrategy(self):
905     """Creates a new timeout strategy.
906
907     """
908     self._timeout_strategy = \
909       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
910
911   def CheckPriorityIncrease(self):
912     """Checks whether priority can and should be increased.
913
914     Called when locks couldn't be acquired.
915
916     """
917     op = self.op
918
919     # Exhausted all retries and next round should not use blocking acquire
920     # for locks?
921     if (self._timeout_strategy.Peek() is None and
922         op.priority > constants.OP_PRIO_HIGHEST):
923       logging.debug("Increasing priority")
924       op.priority -= 1
925       self._ResetTimeoutStrategy()
926       return True
927
928     return False
929
930   def GetNextLockTimeout(self):
931     """Returns the next lock acquire timeout.
932
933     """
934     return self._timeout_strategy.Next()
935
936
937 class _JobProcessor(object):
938   (DEFER,
939    WAITDEP,
940    FINISHED) = range(1, 4)
941
942   def __init__(self, queue, opexec_fn, job,
943                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
944     """Initializes this class.
945
946     """
947     self.queue = queue
948     self.opexec_fn = opexec_fn
949     self.job = job
950     self._timeout_strategy_factory = _timeout_strategy_factory
951
952   @staticmethod
953   def _FindNextOpcode(job, timeout_strategy_factory):
954     """Locates the next opcode to run.
955
956     @type job: L{_QueuedJob}
957     @param job: Job object
958     @param timeout_strategy_factory: Callable to create new timeout strategy
959
960     """
961     # Create some sort of a cache to speed up locating next opcode for future
962     # lookups
963     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
964     # pending and one for processed ops.
965     if job.ops_iter is None:
966       job.ops_iter = enumerate(job.ops)
967
968     # Find next opcode to run
969     while True:
970       try:
971         (idx, op) = job.ops_iter.next()
972       except StopIteration:
973         raise errors.ProgrammerError("Called for a finished job")
974
975       if op.status == constants.OP_STATUS_RUNNING:
976         # Found an opcode already marked as running
977         raise errors.ProgrammerError("Called for job marked as running")
978
979       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
980                              timeout_strategy_factory)
981
982       if op.status not in constants.OPS_FINALIZED:
983         return opctx
984
985       # This is a job that was partially completed before master daemon
986       # shutdown, so it can be expected that some opcodes are already
987       # completed successfully (if any did error out, then the whole job
988       # should have been aborted and not resubmitted for processing).
989       logging.info("%s: opcode %s already processed, skipping",
990                    opctx.log_prefix, opctx.summary)
991
992   @staticmethod
993   def _MarkWaitlock(job, op):
994     """Marks an opcode as waiting for locks.
995
996     The job's start timestamp is also set if necessary.
997
998     @type job: L{_QueuedJob}
999     @param job: Job object
1000     @type op: L{_QueuedOpCode}
1001     @param op: Opcode object
1002
1003     """
1004     assert op in job.ops
1005     assert op.status in (constants.OP_STATUS_QUEUED,
1006                          constants.OP_STATUS_WAITING)
1007
1008     update = False
1009
1010     op.result = None
1011
1012     if op.status == constants.OP_STATUS_QUEUED:
1013       op.status = constants.OP_STATUS_WAITING
1014       update = True
1015
1016     if op.start_timestamp is None:
1017       op.start_timestamp = TimeStampNow()
1018       update = True
1019
1020     if job.start_timestamp is None:
1021       job.start_timestamp = op.start_timestamp
1022       update = True
1023
1024     assert op.status == constants.OP_STATUS_WAITING
1025
1026     return update
1027
1028   @staticmethod
1029   def _CheckDependencies(queue, job, opctx):
1030     """Checks if an opcode has dependencies and if so, processes them.
1031
1032     @type queue: L{JobQueue}
1033     @param queue: Queue object
1034     @type job: L{_QueuedJob}
1035     @param job: Job object
1036     @type opctx: L{_OpExecContext}
1037     @param opctx: Opcode execution context
1038     @rtype: bool
1039     @return: Whether opcode will be re-scheduled by dependency tracker
1040
1041     """
1042     op = opctx.op
1043
1044     result = False
1045
1046     while opctx.jobdeps:
1047       (dep_job_id, dep_status) = opctx.jobdeps[0]
1048
1049       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
1050                                                           dep_status)
1051       assert ht.TNonEmptyString(depmsg), "No dependency message"
1052
1053       logging.info("%s: %s", opctx.log_prefix, depmsg)
1054
1055       if depresult == _JobDependencyManager.CONTINUE:
1056         # Remove dependency and continue
1057         opctx.jobdeps.pop(0)
1058
1059       elif depresult == _JobDependencyManager.WAIT:
1060         # Need to wait for notification, dependency tracker will re-add job
1061         # to workerpool
1062         result = True
1063         break
1064
1065       elif depresult == _JobDependencyManager.CANCEL:
1066         # Job was cancelled, cancel this job as well
1067         job.Cancel()
1068         assert op.status == constants.OP_STATUS_CANCELING
1069         break
1070
1071       elif depresult in (_JobDependencyManager.WRONGSTATUS,
1072                          _JobDependencyManager.ERROR):
1073         # Job failed or there was an error, this job must fail
1074         op.status = constants.OP_STATUS_ERROR
1075         op.result = _EncodeOpError(errors.OpExecError(depmsg))
1076         break
1077
1078       else:
1079         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1080                                      depresult)
1081
1082     return result
1083
1084   def _ExecOpCodeUnlocked(self, opctx):
1085     """Processes one opcode and returns the result.
1086
1087     """
1088     op = opctx.op
1089
1090     assert op.status == constants.OP_STATUS_WAITING
1091
1092     timeout = opctx.GetNextLockTimeout()
1093
1094     try:
1095       # Make sure not to hold queue lock while calling ExecOpCode
1096       result = self.opexec_fn(op.input,
1097                               _OpExecCallbacks(self.queue, self.job, op),
1098                               timeout=timeout)
1099     except mcpu.LockAcquireTimeout:
1100       assert timeout is not None, "Received timeout for blocking acquire"
1101       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1102
1103       assert op.status in (constants.OP_STATUS_WAITING,
1104                            constants.OP_STATUS_CANCELING)
1105
1106       # Was job cancelled while we were waiting for the lock?
1107       if op.status == constants.OP_STATUS_CANCELING:
1108         return (constants.OP_STATUS_CANCELING, None)
1109
1110       # Queue is shutting down, return to queued
1111       if not self.queue.AcceptingJobsUnlocked():
1112         return (constants.OP_STATUS_QUEUED, None)
1113
1114       # Stay in waitlock while trying to re-acquire lock
1115       return (constants.OP_STATUS_WAITING, None)
1116     except CancelJob:
1117       logging.exception("%s: Canceling job", opctx.log_prefix)
1118       assert op.status == constants.OP_STATUS_CANCELING
1119       return (constants.OP_STATUS_CANCELING, None)
1120
1121     except QueueShutdown:
1122       logging.exception("%s: Queue is shutting down", opctx.log_prefix)
1123
1124       assert op.status == constants.OP_STATUS_WAITING
1125
1126       # Job hadn't been started yet, so it should return to the queue
1127       return (constants.OP_STATUS_QUEUED, None)
1128
1129     except Exception, err: # pylint: disable=W0703
1130       logging.exception("%s: Caught exception in %s",
1131                         opctx.log_prefix, opctx.summary)
1132       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1133     else:
1134       logging.debug("%s: %s successful",
1135                     opctx.log_prefix, opctx.summary)
1136       return (constants.OP_STATUS_SUCCESS, result)
1137
1138   def __call__(self, _nextop_fn=None):
1139     """Continues execution of a job.
1140
1141     @param _nextop_fn: Callback function for tests
1142     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1143       be deferred and C{WAITDEP} if the dependency manager
1144       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1145
1146     """
1147     queue = self.queue
1148     job = self.job
1149
1150     logging.debug("Processing job %s", job.id)
1151
1152     queue.acquire(shared=1)
1153     try:
1154       opcount = len(job.ops)
1155
1156       assert job.writable, "Expected writable job"
1157
1158       # Don't do anything for finalized jobs
1159       if job.CalcStatus() in constants.JOBS_FINALIZED:
1160         return self.FINISHED
1161
1162       # Is a previous opcode still pending?
1163       if job.cur_opctx:
1164         opctx = job.cur_opctx
1165         job.cur_opctx = None
1166       else:
1167         if __debug__ and _nextop_fn:
1168           _nextop_fn()
1169         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1170
1171       op = opctx.op
1172
1173       # Consistency check
1174       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1175                                      constants.OP_STATUS_CANCELING)
1176                         for i in job.ops[opctx.index + 1:])
1177
1178       assert op.status in (constants.OP_STATUS_QUEUED,
1179                            constants.OP_STATUS_WAITING,
1180                            constants.OP_STATUS_CANCELING)
1181
1182       assert (op.priority <= constants.OP_PRIO_LOWEST and
1183               op.priority >= constants.OP_PRIO_HIGHEST)
1184
1185       waitjob = None
1186
1187       if op.status != constants.OP_STATUS_CANCELING:
1188         assert op.status in (constants.OP_STATUS_QUEUED,
1189                              constants.OP_STATUS_WAITING)
1190
1191         # Prepare to start opcode
1192         if self._MarkWaitlock(job, op):
1193           # Write to disk
1194           queue.UpdateJobUnlocked(job)
1195
1196         assert op.status == constants.OP_STATUS_WAITING
1197         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1198         assert job.start_timestamp and op.start_timestamp
1199         assert waitjob is None
1200
1201         # Check if waiting for a job is necessary
1202         waitjob = self._CheckDependencies(queue, job, opctx)
1203
1204         assert op.status in (constants.OP_STATUS_WAITING,
1205                              constants.OP_STATUS_CANCELING,
1206                              constants.OP_STATUS_ERROR)
1207
1208         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1209                                          constants.OP_STATUS_ERROR)):
1210           logging.info("%s: opcode %s waiting for locks",
1211                        opctx.log_prefix, opctx.summary)
1212
1213           assert not opctx.jobdeps, "Not all dependencies were removed"
1214
1215           queue.release()
1216           try:
1217             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1218           finally:
1219             queue.acquire(shared=1)
1220
1221           op.status = op_status
1222           op.result = op_result
1223
1224           assert not waitjob
1225
1226         if op.status in (constants.OP_STATUS_WAITING,
1227                          constants.OP_STATUS_QUEUED):
1228           # waiting: Couldn't get locks in time
1229           # queued: Queue is shutting down
1230           assert not op.end_timestamp
1231         else:
1232           # Finalize opcode
1233           op.end_timestamp = TimeStampNow()
1234
1235           if op.status == constants.OP_STATUS_CANCELING:
1236             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1237                                   for i in job.ops[opctx.index:])
1238           else:
1239             assert op.status in constants.OPS_FINALIZED
1240
1241       if op.status == constants.OP_STATUS_QUEUED:
1242         # Queue is shutting down
1243         assert not waitjob
1244
1245         finalize = False
1246
1247         # Reset context
1248         job.cur_opctx = None
1249
1250         # In no case must the status be finalized here
1251         assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1252
1253       elif op.status == constants.OP_STATUS_WAITING or waitjob:
1254         finalize = False
1255
1256         if not waitjob and opctx.CheckPriorityIncrease():
1257           # Priority was changed, need to update on-disk file
1258           queue.UpdateJobUnlocked(job)
1259
1260         # Keep around for another round
1261         job.cur_opctx = opctx
1262
1263         assert (op.priority <= constants.OP_PRIO_LOWEST and
1264                 op.priority >= constants.OP_PRIO_HIGHEST)
1265
1266         # In no case must the status be finalized here
1267         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1268
1269       else:
1270         # Ensure all opcodes so far have been successful
1271         assert (opctx.index == 0 or
1272                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1273                            for i in job.ops[:opctx.index]))
1274
1275         # Reset context
1276         job.cur_opctx = None
1277
1278         if op.status == constants.OP_STATUS_SUCCESS:
1279           finalize = False
1280
1281         elif op.status == constants.OP_STATUS_ERROR:
1282           # Ensure failed opcode has an exception as its result
1283           assert errors.GetEncodedError(job.ops[opctx.index].result)
1284
1285           to_encode = errors.OpExecError("Preceding opcode failed")
1286           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1287                                 _EncodeOpError(to_encode))
1288           finalize = True
1289
1290           # Consistency check
1291           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1292                             errors.GetEncodedError(i.result)
1293                             for i in job.ops[opctx.index:])
1294
1295         elif op.status == constants.OP_STATUS_CANCELING:
1296           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1297                                 "Job canceled by request")
1298           finalize = True
1299
1300         else:
1301           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1302
1303         if opctx.index == (opcount - 1):
1304           # Finalize on last opcode
1305           finalize = True
1306
1307         if finalize:
1308           # All opcodes have been run, finalize job
1309           job.Finalize()
1310
1311         # Write to disk. If the job status is final, this is the final write
1312         # allowed. Once the file has been written, it can be archived anytime.
1313         queue.UpdateJobUnlocked(job)
1314
1315         assert not waitjob
1316
1317         if finalize:
1318           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1319           return self.FINISHED
1320
1321       assert not waitjob or queue.depmgr.JobWaiting(job)
1322
1323       if waitjob:
1324         return self.WAITDEP
1325       else:
1326         return self.DEFER
1327     finally:
1328       assert job.writable, "Job became read-only while being processed"
1329       queue.release()
1330
1331
1332 def _EvaluateJobProcessorResult(depmgr, job, result):
1333   """Looks at a result from L{_JobProcessor} for a job.
1334
1335   To be used in a L{_JobQueueWorker}.
1336
1337   """
1338   if result == _JobProcessor.FINISHED:
1339     # Notify waiting jobs
1340     depmgr.NotifyWaiters(job.id)
1341
1342   elif result == _JobProcessor.DEFER:
1343     # Schedule again
1344     raise workerpool.DeferTask(priority=job.CalcPriority())
1345
1346   elif result == _JobProcessor.WAITDEP:
1347     # No-op, dependency manager will re-schedule
1348     pass
1349
1350   else:
1351     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1352                                  (result, ))
1353
1354
1355 class _JobQueueWorker(workerpool.BaseWorker):
1356   """The actual job workers.
1357
1358   """
1359   def RunTask(self, job): # pylint: disable=W0221
1360     """Job executor.
1361
1362     @type job: L{_QueuedJob}
1363     @param job: the job to be processed
1364
1365     """
1366     assert job.writable, "Expected writable job"
1367
1368     # Ensure only one worker is active on a single job. If a job registers for
1369     # a dependency job, and the other job notifies before the first worker is
1370     # done, the job can end up in the tasklist more than once.
1371     job.processor_lock.acquire()
1372     try:
1373       return self._RunTaskInner(job)
1374     finally:
1375       job.processor_lock.release()
1376
1377   def _RunTaskInner(self, job):
1378     """Executes a job.
1379
1380     Must be called with per-job lock acquired.
1381
1382     """
1383     queue = job.queue
1384     assert queue == self.pool.queue
1385
1386     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1387     setname_fn(None)
1388
1389     proc = mcpu.Processor(queue.context, job.id)
1390
1391     # Create wrapper for setting thread name
1392     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1393                                     proc.ExecOpCode)
1394
1395     _EvaluateJobProcessorResult(queue.depmgr, job,
1396                                 _JobProcessor(queue, wrap_execop_fn, job)())
1397
1398   @staticmethod
1399   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1400     """Updates the worker thread name to include a short summary of the opcode.
1401
1402     @param setname_fn: Callable setting worker thread name
1403     @param execop_fn: Callable for executing opcode (usually
1404                       L{mcpu.Processor.ExecOpCode})
1405
1406     """
1407     setname_fn(op)
1408     try:
1409       return execop_fn(op, *args, **kwargs)
1410     finally:
1411       setname_fn(None)
1412
1413   @staticmethod
1414   def _GetWorkerName(job, op):
1415     """Sets the worker thread name.
1416
1417     @type job: L{_QueuedJob}
1418     @type op: L{opcodes.OpCode}
1419
1420     """
1421     parts = ["Job%s" % job.id]
1422
1423     if op:
1424       parts.append(op.TinySummary())
1425
1426     return "/".join(parts)
1427
1428
1429 class _JobQueueWorkerPool(workerpool.WorkerPool):
1430   """Simple class implementing a job-processing workerpool.
1431
1432   """
1433   def __init__(self, queue):
1434     super(_JobQueueWorkerPool, self).__init__("Jq",
1435                                               JOBQUEUE_THREADS,
1436                                               _JobQueueWorker)
1437     self.queue = queue
1438
1439
1440 class _JobDependencyManager:
1441   """Keeps track of job dependencies.
1442
1443   """
1444   (WAIT,
1445    ERROR,
1446    CANCEL,
1447    CONTINUE,
1448    WRONGSTATUS) = range(1, 6)
1449
1450   def __init__(self, getstatus_fn, enqueue_fn):
1451     """Initializes this class.
1452
1453     """
1454     self._getstatus_fn = getstatus_fn
1455     self._enqueue_fn = enqueue_fn
1456
1457     self._waiters = {}
1458     self._lock = locking.SharedLock("JobDepMgr")
1459
1460   @locking.ssynchronized(_LOCK, shared=1)
1461   def GetLockInfo(self, requested): # pylint: disable=W0613
1462     """Retrieves information about waiting jobs.
1463
1464     @type requested: set
1465     @param requested: Requested information, see C{query.LQ_*}
1466
1467     """
1468     # No need to sort here, that's being done by the lock manager and query
1469     # library. There are no priorities for notifying jobs, hence all show up as
1470     # one item under "pending".
1471     return [("job/%s" % job_id, None, None,
1472              [("job", [job.id for job in waiters])])
1473             for job_id, waiters in self._waiters.items()
1474             if waiters]
1475
1476   @locking.ssynchronized(_LOCK, shared=1)
1477   def JobWaiting(self, job):
1478     """Checks if a job is waiting.
1479
1480     """
1481     return compat.any(job in jobs
1482                       for jobs in self._waiters.values())
1483
1484   @locking.ssynchronized(_LOCK)
1485   def CheckAndRegister(self, job, dep_job_id, dep_status):
1486     """Checks if a dependency job has the requested status.
1487
1488     If the other job is not yet in a finalized status, the calling job will be
1489     notified (re-added to the workerpool) at a later point.
1490
1491     @type job: L{_QueuedJob}
1492     @param job: Job object
1493     @type dep_job_id: int
1494     @param dep_job_id: ID of dependency job
1495     @type dep_status: list
1496     @param dep_status: Required status
1497
1498     """
1499     assert ht.TJobId(job.id)
1500     assert ht.TJobId(dep_job_id)
1501     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1502
1503     if job.id == dep_job_id:
1504       return (self.ERROR, "Job can't depend on itself")
1505
1506     # Get status of dependency job
1507     try:
1508       status = self._getstatus_fn(dep_job_id)
1509     except errors.JobLost, err:
1510       return (self.ERROR, "Dependency error: %s" % err)
1511
1512     assert status in constants.JOB_STATUS_ALL
1513
1514     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1515
1516     if status not in constants.JOBS_FINALIZED:
1517       # Register for notification and wait for job to finish
1518       job_id_waiters.add(job)
1519       return (self.WAIT,
1520               "Need to wait for job %s, wanted status '%s'" %
1521               (dep_job_id, dep_status))
1522
1523     # Remove from waiters list
1524     if job in job_id_waiters:
1525       job_id_waiters.remove(job)
1526
1527     if (status == constants.JOB_STATUS_CANCELED and
1528         constants.JOB_STATUS_CANCELED not in dep_status):
1529       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1530
1531     elif not dep_status or status in dep_status:
1532       return (self.CONTINUE,
1533               "Dependency job %s finished with status '%s'" %
1534               (dep_job_id, status))
1535
1536     else:
1537       return (self.WRONGSTATUS,
1538               "Dependency job %s finished with status '%s',"
1539               " not one of '%s' as required" %
1540               (dep_job_id, status, utils.CommaJoin(dep_status)))
1541
1542   def _RemoveEmptyWaitersUnlocked(self):
1543     """Remove all jobs without actual waiters.
1544
1545     """
1546     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1547                    if not waiters]:
1548       del self._waiters[job_id]
1549
1550   def NotifyWaiters(self, job_id):
1551     """Notifies all jobs waiting for a certain job ID.
1552
1553     @attention: Do not call until L{CheckAndRegister} returned a status other
1554       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1555     @type job_id: int
1556     @param job_id: Job ID
1557
1558     """
1559     assert ht.TJobId(job_id)
1560
1561     self._lock.acquire()
1562     try:
1563       self._RemoveEmptyWaitersUnlocked()
1564
1565       jobs = self._waiters.pop(job_id, None)
1566     finally:
1567       self._lock.release()
1568
1569     if jobs:
1570       # Re-add jobs to workerpool
1571       logging.debug("Re-adding %s jobs which were waiting for job %s",
1572                     len(jobs), job_id)
1573       self._enqueue_fn(jobs)
1574
1575
1576 def _RequireOpenQueue(fn):
1577   """Decorator for "public" functions.
1578
1579   This function should be used for all 'public' functions. That is,
1580   functions usually called from other classes. Note that this should
1581   be applied only to methods (not plain functions), since it expects
1582   that the decorated function is called with a first argument that has
1583   a '_queue_filelock' argument.
1584
1585   @warning: Use this decorator only after locking.ssynchronized
1586
1587   Example::
1588     @locking.ssynchronized(_LOCK)
1589     @_RequireOpenQueue
1590     def Example(self):
1591       pass
1592
1593   """
1594   def wrapper(self, *args, **kwargs):
1595     # pylint: disable=W0212
1596     assert self._queue_filelock is not None, "Queue should be open"
1597     return fn(self, *args, **kwargs)
1598   return wrapper
1599
1600
1601 def _RequireNonDrainedQueue(fn):
1602   """Decorator checking for a non-drained queue.
1603
1604   To be used with functions submitting new jobs.
1605
1606   """
1607   def wrapper(self, *args, **kwargs):
1608     """Wrapper function.
1609
1610     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1611
1612     """
1613     # Ok when sharing the big job queue lock, as the drain file is created when
1614     # the lock is exclusive.
1615     # Needs access to protected member, pylint: disable=W0212
1616     if self._drained:
1617       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1618
1619     if not self._accepting_jobs:
1620       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1621
1622     return fn(self, *args, **kwargs)
1623   return wrapper
1624
1625
1626 class JobQueue(object):
1627   """Queue used to manage the jobs.
1628
1629   """
1630   def __init__(self, context):
1631     """Constructor for JobQueue.
1632
1633     The constructor will initialize the job queue object and then
1634     start loading the current jobs from disk, either for starting them
1635     (if they were queue) or for aborting them (if they were already
1636     running).
1637
1638     @type context: GanetiContext
1639     @param context: the context object for access to the configuration
1640         data and other ganeti objects
1641
1642     """
1643     self.context = context
1644     self._memcache = weakref.WeakValueDictionary()
1645     self._my_hostname = netutils.Hostname.GetSysName()
1646
1647     # The Big JobQueue lock. If a code block or method acquires it in shared
1648     # mode safe it must guarantee concurrency with all the code acquiring it in
1649     # shared mode, including itself. In order not to acquire it at all
1650     # concurrency must be guaranteed with all code acquiring it in shared mode
1651     # and all code acquiring it exclusively.
1652     self._lock = locking.SharedLock("JobQueue")
1653
1654     self.acquire = self._lock.acquire
1655     self.release = self._lock.release
1656
1657     # Accept jobs by default
1658     self._accepting_jobs = True
1659
1660     # Initialize the queue, and acquire the filelock.
1661     # This ensures no other process is working on the job queue.
1662     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1663
1664     # Read serial file
1665     self._last_serial = jstore.ReadSerial()
1666     assert self._last_serial is not None, ("Serial file was modified between"
1667                                            " check in jstore and here")
1668
1669     # Get initial list of nodes
1670     self._nodes = dict((n.name, n.primary_ip)
1671                        for n in self.context.cfg.GetAllNodesInfo().values()
1672                        if n.master_candidate)
1673
1674     # Remove master node
1675     self._nodes.pop(self._my_hostname, None)
1676
1677     # TODO: Check consistency across nodes
1678
1679     self._queue_size = None
1680     self._UpdateQueueSizeUnlocked()
1681     assert ht.TInt(self._queue_size)
1682     self._drained = jstore.CheckDrainFlag()
1683
1684     # Job dependencies
1685     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1686                                         self._EnqueueJobs)
1687     self.context.glm.AddToLockMonitor(self.depmgr)
1688
1689     # Setup worker pool
1690     self._wpool = _JobQueueWorkerPool(self)
1691     try:
1692       self._InspectQueue()
1693     except:
1694       self._wpool.TerminateWorkers()
1695       raise
1696
1697   @locking.ssynchronized(_LOCK)
1698   @_RequireOpenQueue
1699   def _InspectQueue(self):
1700     """Loads the whole job queue and resumes unfinished jobs.
1701
1702     This function needs the lock here because WorkerPool.AddTask() may start a
1703     job while we're still doing our work.
1704
1705     """
1706     logging.info("Inspecting job queue")
1707
1708     restartjobs = []
1709
1710     all_job_ids = self._GetJobIDsUnlocked()
1711     jobs_count = len(all_job_ids)
1712     lastinfo = time.time()
1713     for idx, job_id in enumerate(all_job_ids):
1714       # Give an update every 1000 jobs or 10 seconds
1715       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1716           idx == (jobs_count - 1)):
1717         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1718                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1719         lastinfo = time.time()
1720
1721       job = self._LoadJobUnlocked(job_id)
1722
1723       # a failure in loading the job can cause 'None' to be returned
1724       if job is None:
1725         continue
1726
1727       status = job.CalcStatus()
1728
1729       if status == constants.JOB_STATUS_QUEUED:
1730         restartjobs.append(job)
1731
1732       elif status in (constants.JOB_STATUS_RUNNING,
1733                       constants.JOB_STATUS_WAITING,
1734                       constants.JOB_STATUS_CANCELING):
1735         logging.warning("Unfinished job %s found: %s", job.id, job)
1736
1737         if status == constants.JOB_STATUS_WAITING:
1738           # Restart job
1739           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1740           restartjobs.append(job)
1741         else:
1742           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1743                                 "Unclean master daemon shutdown")
1744           job.Finalize()
1745
1746         self.UpdateJobUnlocked(job)
1747
1748     if restartjobs:
1749       logging.info("Restarting %s jobs", len(restartjobs))
1750       self._EnqueueJobsUnlocked(restartjobs)
1751
1752     logging.info("Job queue inspection finished")
1753
1754   def _GetRpc(self, address_list):
1755     """Gets RPC runner with context.
1756
1757     """
1758     return rpc.JobQueueRunner(self.context, address_list)
1759
1760   @locking.ssynchronized(_LOCK)
1761   @_RequireOpenQueue
1762   def AddNode(self, node):
1763     """Register a new node with the queue.
1764
1765     @type node: L{objects.Node}
1766     @param node: the node object to be added
1767
1768     """
1769     node_name = node.name
1770     assert node_name != self._my_hostname
1771
1772     # Clean queue directory on added node
1773     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1774     msg = result.fail_msg
1775     if msg:
1776       logging.warning("Cannot cleanup queue directory on node %s: %s",
1777                       node_name, msg)
1778
1779     if not node.master_candidate:
1780       # remove if existing, ignoring errors
1781       self._nodes.pop(node_name, None)
1782       # and skip the replication of the job ids
1783       return
1784
1785     # Upload the whole queue excluding archived jobs
1786     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1787
1788     # Upload current serial file
1789     files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1790
1791     # Static address list
1792     addrs = [node.primary_ip]
1793
1794     for file_name in files:
1795       # Read file content
1796       content = utils.ReadFile(file_name)
1797
1798       result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1799                              file_name, content)
1800       msg = result[node_name].fail_msg
1801       if msg:
1802         logging.error("Failed to upload file %s to node %s: %s",
1803                       file_name, node_name, msg)
1804
1805     # Set queue drained flag
1806     result = \
1807       self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
1808                                                        self._drained)
1809     msg = result[node_name].fail_msg
1810     if msg:
1811       logging.error("Failed to set queue drained flag on node %s: %s",
1812                     node_name, msg)
1813
1814     self._nodes[node_name] = node.primary_ip
1815
1816   @locking.ssynchronized(_LOCK)
1817   @_RequireOpenQueue
1818   def RemoveNode(self, node_name):
1819     """Callback called when removing nodes from the cluster.
1820
1821     @type node_name: str
1822     @param node_name: the name of the node to remove
1823
1824     """
1825     self._nodes.pop(node_name, None)
1826
1827   @staticmethod
1828   def _CheckRpcResult(result, nodes, failmsg):
1829     """Verifies the status of an RPC call.
1830
1831     Since we aim to keep consistency should this node (the current
1832     master) fail, we will log errors if our rpc fail, and especially
1833     log the case when more than half of the nodes fails.
1834
1835     @param result: the data as returned from the rpc call
1836     @type nodes: list
1837     @param nodes: the list of nodes we made the call to
1838     @type failmsg: str
1839     @param failmsg: the identifier to be used for logging
1840
1841     """
1842     failed = []
1843     success = []
1844
1845     for node in nodes:
1846       msg = result[node].fail_msg
1847       if msg:
1848         failed.append(node)
1849         logging.error("RPC call %s (%s) failed on node %s: %s",
1850                       result[node].call, failmsg, node, msg)
1851       else:
1852         success.append(node)
1853
1854     # +1 for the master node
1855     if (len(success) + 1) < len(failed):
1856       # TODO: Handle failing nodes
1857       logging.error("More than half of the nodes failed")
1858
1859   def _GetNodeIp(self):
1860     """Helper for returning the node name/ip list.
1861
1862     @rtype: (list, list)
1863     @return: a tuple of two lists, the first one with the node
1864         names and the second one with the node addresses
1865
1866     """
1867     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1868     name_list = self._nodes.keys()
1869     addr_list = [self._nodes[name] for name in name_list]
1870     return name_list, addr_list
1871
1872   def _UpdateJobQueueFile(self, file_name, data, replicate):
1873     """Writes a file locally and then replicates it to all nodes.
1874
1875     This function will replace the contents of a file on the local
1876     node and then replicate it to all the other nodes we have.
1877
1878     @type file_name: str
1879     @param file_name: the path of the file to be replicated
1880     @type data: str
1881     @param data: the new contents of the file
1882     @type replicate: boolean
1883     @param replicate: whether to spread the changes to the remote nodes
1884
1885     """
1886     getents = runtime.GetEnts()
1887     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1888                     gid=getents.masterd_gid)
1889
1890     if replicate:
1891       names, addrs = self._GetNodeIp()
1892       result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1893       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1894
1895   def _RenameFilesUnlocked(self, rename):
1896     """Renames a file locally and then replicate the change.
1897
1898     This function will rename a file in the local queue directory
1899     and then replicate this rename to all the other nodes we have.
1900
1901     @type rename: list of (old, new)
1902     @param rename: List containing tuples mapping old to new names
1903
1904     """
1905     # Rename them locally
1906     for old, new in rename:
1907       utils.RenameFile(old, new, mkdir=True)
1908
1909     # ... and on all nodes
1910     names, addrs = self._GetNodeIp()
1911     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1912     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1913
1914   def _NewSerialsUnlocked(self, count):
1915     """Generates a new job identifier.
1916
1917     Job identifiers are unique during the lifetime of a cluster.
1918
1919     @type count: integer
1920     @param count: how many serials to return
1921     @rtype: list of int
1922     @return: a list of job identifiers.
1923
1924     """
1925     assert ht.TNonNegativeInt(count)
1926
1927     # New number
1928     serial = self._last_serial + count
1929
1930     # Write to file
1931     self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1932                              "%s\n" % serial, True)
1933
1934     result = [jstore.FormatJobID(v)
1935               for v in range(self._last_serial + 1, serial + 1)]
1936
1937     # Keep it only if we were able to write the file
1938     self._last_serial = serial
1939
1940     assert len(result) == count
1941
1942     return result
1943
1944   @staticmethod
1945   def _GetJobPath(job_id):
1946     """Returns the job file for a given job id.
1947
1948     @type job_id: str
1949     @param job_id: the job identifier
1950     @rtype: str
1951     @return: the path to the job file
1952
1953     """
1954     return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1955
1956   @staticmethod
1957   def _GetArchivedJobPath(job_id):
1958     """Returns the archived job file for a give job id.
1959
1960     @type job_id: str
1961     @param job_id: the job identifier
1962     @rtype: str
1963     @return: the path to the archived job file
1964
1965     """
1966     return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1967                           jstore.GetArchiveDirectory(job_id),
1968                           "job-%s" % job_id)
1969
1970   @staticmethod
1971   def _DetermineJobDirectories(archived):
1972     """Build list of directories containing job files.
1973
1974     @type archived: bool
1975     @param archived: Whether to include directories for archived jobs
1976     @rtype: list
1977
1978     """
1979     result = [pathutils.QUEUE_DIR]
1980
1981     if archived:
1982       archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1983       result.extend(map(compat.partial(utils.PathJoin, archive_path),
1984                         utils.ListVisibleFiles(archive_path)))
1985
1986     return result
1987
1988   @classmethod
1989   def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1990     """Return all known job IDs.
1991
1992     The method only looks at disk because it's a requirement that all
1993     jobs are present on disk (so in the _memcache we don't have any
1994     extra IDs).
1995
1996     @type sort: boolean
1997     @param sort: perform sorting on the returned job ids
1998     @rtype: list
1999     @return: the list of job IDs
2000
2001     """
2002     jlist = []
2003
2004     for path in cls._DetermineJobDirectories(archived):
2005       for filename in utils.ListVisibleFiles(path):
2006         m = constants.JOB_FILE_RE.match(filename)
2007         if m:
2008           jlist.append(int(m.group(1)))
2009
2010     if sort:
2011       jlist.sort()
2012     return jlist
2013
2014   def _LoadJobUnlocked(self, job_id):
2015     """Loads a job from the disk or memory.
2016
2017     Given a job id, this will return the cached job object if
2018     existing, or try to load the job from the disk. If loading from
2019     disk, it will also add the job to the cache.
2020
2021     @type job_id: int
2022     @param job_id: the job id
2023     @rtype: L{_QueuedJob} or None
2024     @return: either None or the job object
2025
2026     """
2027     job = self._memcache.get(job_id, None)
2028     if job:
2029       logging.debug("Found job %s in memcache", job_id)
2030       assert job.writable, "Found read-only job in memcache"
2031       return job
2032
2033     try:
2034       job = self._LoadJobFromDisk(job_id, False)
2035       if job is None:
2036         return job
2037     except errors.JobFileCorrupted:
2038       old_path = self._GetJobPath(job_id)
2039       new_path = self._GetArchivedJobPath(job_id)
2040       if old_path == new_path:
2041         # job already archived (future case)
2042         logging.exception("Can't parse job %s", job_id)
2043       else:
2044         # non-archived case
2045         logging.exception("Can't parse job %s, will archive.", job_id)
2046         self._RenameFilesUnlocked([(old_path, new_path)])
2047       return None
2048
2049     assert job.writable, "Job just loaded is not writable"
2050
2051     self._memcache[job_id] = job
2052     logging.debug("Added job %s to the cache", job_id)
2053     return job
2054
2055   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
2056     """Load the given job file from disk.
2057
2058     Given a job file, read, load and restore it in a _QueuedJob format.
2059
2060     @type job_id: int
2061     @param job_id: job identifier
2062     @type try_archived: bool
2063     @param try_archived: Whether to try loading an archived job
2064     @rtype: L{_QueuedJob} or None
2065     @return: either None or the job object
2066
2067     """
2068     path_functions = [(self._GetJobPath, False)]
2069
2070     if try_archived:
2071       path_functions.append((self._GetArchivedJobPath, True))
2072
2073     raw_data = None
2074     archived = None
2075
2076     for (fn, archived) in path_functions:
2077       filepath = fn(job_id)
2078       logging.debug("Loading job from %s", filepath)
2079       try:
2080         raw_data = utils.ReadFile(filepath)
2081       except EnvironmentError, err:
2082         if err.errno != errno.ENOENT:
2083           raise
2084       else:
2085         break
2086
2087     if not raw_data:
2088       return None
2089
2090     if writable is None:
2091       writable = not archived
2092
2093     try:
2094       data = serializer.LoadJson(raw_data)
2095       job = _QueuedJob.Restore(self, data, writable, archived)
2096     except Exception, err: # pylint: disable=W0703
2097       raise errors.JobFileCorrupted(err)
2098
2099     return job
2100
2101   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
2102     """Load the given job file from disk.
2103
2104     Given a job file, read, load and restore it in a _QueuedJob format.
2105     In case of error reading the job, it gets returned as None, and the
2106     exception is logged.
2107
2108     @type job_id: int
2109     @param job_id: job identifier
2110     @type try_archived: bool
2111     @param try_archived: Whether to try loading an archived job
2112     @rtype: L{_QueuedJob} or None
2113     @return: either None or the job object
2114
2115     """
2116     try:
2117       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
2118     except (errors.JobFileCorrupted, EnvironmentError):
2119       logging.exception("Can't load/parse job %s", job_id)
2120       return None
2121
2122   def _UpdateQueueSizeUnlocked(self):
2123     """Update the queue size.
2124
2125     """
2126     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2127
2128   @locking.ssynchronized(_LOCK)
2129   @_RequireOpenQueue
2130   def SetDrainFlag(self, drain_flag):
2131     """Sets the drain flag for the queue.
2132
2133     @type drain_flag: boolean
2134     @param drain_flag: Whether to set or unset the drain flag
2135
2136     """
2137     # Change flag locally
2138     jstore.SetDrainFlag(drain_flag)
2139
2140     self._drained = drain_flag
2141
2142     # ... and on all nodes
2143     (names, addrs) = self._GetNodeIp()
2144     result = \
2145       self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
2146     self._CheckRpcResult(result, self._nodes,
2147                          "Setting queue drain flag to %s" % drain_flag)
2148
2149     return True
2150
2151   @_RequireOpenQueue
2152   def _SubmitJobUnlocked(self, job_id, ops):
2153     """Create and store a new job.
2154
2155     This enters the job into our job queue and also puts it on the new
2156     queue, in order for it to be picked up by the queue processors.
2157
2158     @type job_id: job ID
2159     @param job_id: the job ID for the new job
2160     @type ops: list
2161     @param ops: The list of OpCodes that will become the new job.
2162     @rtype: L{_QueuedJob}
2163     @return: the job object to be queued
2164     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2165     @raise errors.GenericError: If an opcode is not valid
2166
2167     """
2168     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2169       raise errors.JobQueueFull()
2170
2171     job = _QueuedJob(self, job_id, ops, True)
2172
2173     for idx, op in enumerate(job.ops):
2174       # Check priority
2175       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2176         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2177         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2178                                   " are %s" % (idx, op.priority, allowed))
2179
2180       # Check job dependencies
2181       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2182       if not opcodes.TNoRelativeJobDependencies(dependencies):
2183         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2184                                   " match %s: %s" %
2185                                   (idx, opcodes.TNoRelativeJobDependencies,
2186                                    dependencies))
2187
2188     # Write to disk
2189     self.UpdateJobUnlocked(job)
2190
2191     self._queue_size += 1
2192
2193     logging.debug("Adding new job %s to the cache", job_id)
2194     self._memcache[job_id] = job
2195
2196     return job
2197
2198   @locking.ssynchronized(_LOCK)
2199   @_RequireOpenQueue
2200   @_RequireNonDrainedQueue
2201   def SubmitJob(self, ops):
2202     """Create and store a new job.
2203
2204     @see: L{_SubmitJobUnlocked}
2205
2206     """
2207     (job_id, ) = self._NewSerialsUnlocked(1)
2208     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2209     return job_id
2210
2211   @locking.ssynchronized(_LOCK)
2212   @_RequireOpenQueue
2213   @_RequireNonDrainedQueue
2214   def SubmitManyJobs(self, jobs):
2215     """Create and store multiple jobs.
2216
2217     @see: L{_SubmitJobUnlocked}
2218
2219     """
2220     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2221
2222     (results, added_jobs) = \
2223       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2224
2225     self._EnqueueJobsUnlocked(added_jobs)
2226
2227     return results
2228
2229   @staticmethod
2230   def _FormatSubmitError(msg, ops):
2231     """Formats errors which occurred while submitting a job.
2232
2233     """
2234     return ("%s; opcodes %s" %
2235             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2236
2237   @staticmethod
2238   def _ResolveJobDependencies(resolve_fn, deps):
2239     """Resolves relative job IDs in dependencies.
2240
2241     @type resolve_fn: callable
2242     @param resolve_fn: Function to resolve a relative job ID
2243     @type deps: list
2244     @param deps: Dependencies
2245     @rtype: tuple; (boolean, string or list)
2246     @return: If successful (first tuple item), the returned list contains
2247       resolved job IDs along with the requested status; if not successful,
2248       the second element is an error message
2249
2250     """
2251     result = []
2252
2253     for (dep_job_id, dep_status) in deps:
2254       if ht.TRelativeJobId(dep_job_id):
2255         assert ht.TInt(dep_job_id) and dep_job_id < 0
2256         try:
2257           job_id = resolve_fn(dep_job_id)
2258         except IndexError:
2259           # Abort
2260           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2261       else:
2262         job_id = dep_job_id
2263
2264       result.append((job_id, dep_status))
2265
2266     return (True, result)
2267
2268   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2269     """Create and store multiple jobs.
2270
2271     @see: L{_SubmitJobUnlocked}
2272
2273     """
2274     results = []
2275     added_jobs = []
2276
2277     def resolve_fn(job_idx, reljobid):
2278       assert reljobid < 0
2279       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2280
2281     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2282       for op in ops:
2283         if getattr(op, opcodes.DEPEND_ATTR, None):
2284           (status, data) = \
2285             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2286                                          op.depends)
2287           if not status:
2288             # Abort resolving dependencies
2289             assert ht.TNonEmptyString(data), "No error message"
2290             break
2291           # Use resolved dependencies
2292           op.depends = data
2293       else:
2294         try:
2295           job = self._SubmitJobUnlocked(job_id, ops)
2296         except errors.GenericError, err:
2297           status = False
2298           data = self._FormatSubmitError(str(err), ops)
2299         else:
2300           status = True
2301           data = job_id
2302           added_jobs.append(job)
2303
2304       results.append((status, data))
2305
2306     return (results, added_jobs)
2307
2308   @locking.ssynchronized(_LOCK)
2309   def _EnqueueJobs(self, jobs):
2310     """Helper function to add jobs to worker pool's queue.
2311
2312     @type jobs: list
2313     @param jobs: List of all jobs
2314
2315     """
2316     return self._EnqueueJobsUnlocked(jobs)
2317
2318   def _EnqueueJobsUnlocked(self, jobs):
2319     """Helper function to add jobs to worker pool's queue.
2320
2321     @type jobs: list
2322     @param jobs: List of all jobs
2323
2324     """
2325     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2326     self._wpool.AddManyTasks([(job, ) for job in jobs],
2327                              priority=[job.CalcPriority() for job in jobs],
2328                              task_id=map(_GetIdAttr, jobs))
2329
2330   def _GetJobStatusForDependencies(self, job_id):
2331     """Gets the status of a job for dependencies.
2332
2333     @type job_id: int
2334     @param job_id: Job ID
2335     @raise errors.JobLost: If job can't be found
2336
2337     """
2338     # Not using in-memory cache as doing so would require an exclusive lock
2339
2340     # Try to load from disk
2341     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2342
2343     assert not job.writable, "Got writable job" # pylint: disable=E1101
2344
2345     if job:
2346       return job.CalcStatus()
2347
2348     raise errors.JobLost("Job %s not found" % job_id)
2349
2350   @_RequireOpenQueue
2351   def UpdateJobUnlocked(self, job, replicate=True):
2352     """Update a job's on disk storage.
2353
2354     After a job has been modified, this function needs to be called in
2355     order to write the changes to disk and replicate them to the other
2356     nodes.
2357
2358     @type job: L{_QueuedJob}
2359     @param job: the changed job
2360     @type replicate: boolean
2361     @param replicate: whether to replicate the change to remote nodes
2362
2363     """
2364     if __debug__:
2365       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2366       assert (finalized ^ (job.end_timestamp is None))
2367       assert job.writable, "Can't update read-only job"
2368       assert not job.archived, "Can't update archived job"
2369
2370     filename = self._GetJobPath(job.id)
2371     data = serializer.DumpJson(job.Serialize())
2372     logging.debug("Writing job %s to %s", job.id, filename)
2373     self._UpdateJobQueueFile(filename, data, replicate)
2374
2375   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2376                         timeout):
2377     """Waits for changes in a job.
2378
2379     @type job_id: int
2380     @param job_id: Job identifier
2381     @type fields: list of strings
2382     @param fields: Which fields to check for changes
2383     @type prev_job_info: list or None
2384     @param prev_job_info: Last job information returned
2385     @type prev_log_serial: int
2386     @param prev_log_serial: Last job message serial number
2387     @type timeout: float
2388     @param timeout: maximum time to wait in seconds
2389     @rtype: tuple (job info, log entries)
2390     @return: a tuple of the job information as required via
2391         the fields parameter, and the log entries as a list
2392
2393         if the job has not changed and the timeout has expired,
2394         we instead return a special value,
2395         L{constants.JOB_NOTCHANGED}, which should be interpreted
2396         as such by the clients
2397
2398     """
2399     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, True,
2400                              writable=False)
2401
2402     helper = _WaitForJobChangesHelper()
2403
2404     return helper(self._GetJobPath(job_id), load_fn,
2405                   fields, prev_job_info, prev_log_serial, timeout)
2406
2407   @locking.ssynchronized(_LOCK)
2408   @_RequireOpenQueue
2409   def CancelJob(self, job_id):
2410     """Cancels a job.
2411
2412     This will only succeed if the job has not started yet.
2413
2414     @type job_id: int
2415     @param job_id: job ID of job to be cancelled.
2416
2417     """
2418     logging.info("Cancelling job %s", job_id)
2419
2420     return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
2421
2422   @locking.ssynchronized(_LOCK)
2423   @_RequireOpenQueue
2424   def ChangeJobPriority(self, job_id, priority):
2425     """Changes a job's priority.
2426
2427     @type job_id: int
2428     @param job_id: ID of the job whose priority should be changed
2429     @type priority: int
2430     @param priority: New priority
2431
2432     """
2433     logging.info("Changing priority of job %s to %s", job_id, priority)
2434
2435     if priority not in constants.OP_PRIO_SUBMIT_VALID:
2436       allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2437       raise errors.GenericError("Invalid priority %s, allowed are %s" %
2438                                 (priority, allowed))
2439
2440     def fn(job):
2441       (success, msg) = job.ChangePriority(priority)
2442
2443       if success:
2444         try:
2445           self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2446         except workerpool.NoSuchTask:
2447           logging.debug("Job %s is not in workerpool at this time", job.id)
2448
2449       return (success, msg)
2450
2451     return self._ModifyJobUnlocked(job_id, fn)
2452
2453   def _ModifyJobUnlocked(self, job_id, mod_fn):
2454     """Modifies a job.
2455
2456     @type job_id: int
2457     @param job_id: Job ID
2458     @type mod_fn: callable
2459     @param mod_fn: Modifying function, receiving job object as parameter,
2460       returning tuple of (status boolean, message string)
2461
2462     """
2463     job = self._LoadJobUnlocked(job_id)
2464     if not job:
2465       logging.debug("Job %s not found", job_id)
2466       return (False, "Job %s not found" % job_id)
2467
2468     assert job.writable, "Can't modify read-only job"
2469     assert not job.archived, "Can't modify archived job"
2470
2471     (success, msg) = mod_fn(job)
2472
2473     if success:
2474       # If the job was finalized (e.g. cancelled), this is the final write
2475       # allowed. The job can be archived anytime.
2476       self.UpdateJobUnlocked(job)
2477
2478     return (success, msg)
2479
2480   @_RequireOpenQueue
2481   def _ArchiveJobsUnlocked(self, jobs):
2482     """Archives jobs.
2483
2484     @type jobs: list of L{_QueuedJob}
2485     @param jobs: Job objects
2486     @rtype: int
2487     @return: Number of archived jobs
2488
2489     """
2490     archive_jobs = []
2491     rename_files = []
2492     for job in jobs:
2493       assert job.writable, "Can't archive read-only job"
2494       assert not job.archived, "Can't cancel archived job"
2495
2496       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2497         logging.debug("Job %s is not yet done", job.id)
2498         continue
2499
2500       archive_jobs.append(job)
2501
2502       old = self._GetJobPath(job.id)
2503       new = self._GetArchivedJobPath(job.id)
2504       rename_files.append((old, new))
2505
2506     # TODO: What if 1..n files fail to rename?
2507     self._RenameFilesUnlocked(rename_files)
2508
2509     logging.debug("Successfully archived job(s) %s",
2510                   utils.CommaJoin(job.id for job in archive_jobs))
2511
2512     # Since we haven't quite checked, above, if we succeeded or failed renaming
2513     # the files, we update the cached queue size from the filesystem. When we
2514     # get around to fix the TODO: above, we can use the number of actually
2515     # archived jobs to fix this.
2516     self._UpdateQueueSizeUnlocked()
2517     return len(archive_jobs)
2518
2519   @locking.ssynchronized(_LOCK)
2520   @_RequireOpenQueue
2521   def ArchiveJob(self, job_id):
2522     """Archives a job.
2523
2524     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2525
2526     @type job_id: int
2527     @param job_id: Job ID of job to be archived.
2528     @rtype: bool
2529     @return: Whether job was archived
2530
2531     """
2532     logging.info("Archiving job %s", job_id)
2533
2534     job = self._LoadJobUnlocked(job_id)
2535     if not job:
2536       logging.debug("Job %s not found", job_id)
2537       return False
2538
2539     return self._ArchiveJobsUnlocked([job]) == 1
2540
2541   @locking.ssynchronized(_LOCK)
2542   @_RequireOpenQueue
2543   def AutoArchiveJobs(self, age, timeout):
2544     """Archives all jobs based on age.
2545
2546     The method will archive all jobs which are older than the age
2547     parameter. For jobs that don't have an end timestamp, the start
2548     timestamp will be considered. The special '-1' age will cause
2549     archival of all jobs (that are not running or queued).
2550
2551     @type age: int
2552     @param age: the minimum age in seconds
2553
2554     """
2555     logging.info("Archiving jobs with age more than %s seconds", age)
2556
2557     now = time.time()
2558     end_time = now + timeout
2559     archived_count = 0
2560     last_touched = 0
2561
2562     all_job_ids = self._GetJobIDsUnlocked()
2563     pending = []
2564     for idx, job_id in enumerate(all_job_ids):
2565       last_touched = idx + 1
2566
2567       # Not optimal because jobs could be pending
2568       # TODO: Measure average duration for job archival and take number of
2569       # pending jobs into account.
2570       if time.time() > end_time:
2571         break
2572
2573       # Returns None if the job failed to load
2574       job = self._LoadJobUnlocked(job_id)
2575       if job:
2576         if job.end_timestamp is None:
2577           if job.start_timestamp is None:
2578             job_age = job.received_timestamp
2579           else:
2580             job_age = job.start_timestamp
2581         else:
2582           job_age = job.end_timestamp
2583
2584         if age == -1 or now - job_age[0] > age:
2585           pending.append(job)
2586
2587           # Archive 10 jobs at a time
2588           if len(pending) >= 10:
2589             archived_count += self._ArchiveJobsUnlocked(pending)
2590             pending = []
2591
2592     if pending:
2593       archived_count += self._ArchiveJobsUnlocked(pending)
2594
2595     return (archived_count, len(all_job_ids) - last_touched)
2596
2597   def _Query(self, fields, qfilter):
2598     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2599                        namefield="id")
2600
2601     # Archived jobs are only looked at if the "archived" field is referenced
2602     # either as a requested field or in the filter. By default archived jobs
2603     # are ignored.
2604     include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2605
2606     job_ids = qobj.RequestedNames()
2607
2608     list_all = (job_ids is None)
2609
2610     if list_all:
2611       # Since files are added to/removed from the queue atomically, there's no
2612       # risk of getting the job ids in an inconsistent state.
2613       job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2614
2615     jobs = []
2616
2617     for job_id in job_ids:
2618       job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2619       if job is not None or not list_all:
2620         jobs.append((job_id, job))
2621
2622     return (qobj, jobs, list_all)
2623
2624   def QueryJobs(self, fields, qfilter):
2625     """Returns a list of jobs in queue.
2626
2627     @type fields: sequence
2628     @param fields: List of wanted fields
2629     @type qfilter: None or query2 filter (list)
2630     @param qfilter: Query filter
2631
2632     """
2633     (qobj, ctx, _) = self._Query(fields, qfilter)
2634
2635     return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2636
2637   def OldStyleQueryJobs(self, job_ids, fields):
2638     """Returns a list of jobs in queue.
2639
2640     @type job_ids: list
2641     @param job_ids: sequence of job identifiers or None for all
2642     @type fields: list
2643     @param fields: names of fields to return
2644     @rtype: list
2645     @return: list one element per job, each element being list with
2646         the requested fields
2647
2648     """
2649     # backwards compat:
2650     job_ids = [int(jid) for jid in job_ids]
2651     qfilter = qlang.MakeSimpleFilter("id", job_ids)
2652
2653     (qobj, ctx, _) = self._Query(fields, qfilter)
2654
2655     return qobj.OldStyleQuery(ctx, sort_by_name=False)
2656
2657   @locking.ssynchronized(_LOCK)
2658   def PrepareShutdown(self):
2659     """Prepare to stop the job queue.
2660
2661     Disables execution of jobs in the workerpool and returns whether there are
2662     any jobs currently running. If the latter is the case, the job queue is not
2663     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2664     be called without interfering with any job. Queued and unfinished jobs will
2665     be resumed next time.
2666
2667     Once this function has been called no new job submissions will be accepted
2668     (see L{_RequireNonDrainedQueue}).
2669
2670     @rtype: bool
2671     @return: Whether there are any running jobs
2672
2673     """
2674     if self._accepting_jobs:
2675       self._accepting_jobs = False
2676
2677       # Tell worker pool to stop processing pending tasks
2678       self._wpool.SetActive(False)
2679
2680     return self._wpool.HasRunningTasks()
2681
2682   def AcceptingJobsUnlocked(self):
2683     """Returns whether jobs are accepted.
2684
2685     Once L{PrepareShutdown} has been called, no new jobs are accepted and the
2686     queue is shutting down.
2687
2688     @rtype: bool
2689
2690     """
2691     return self._accepting_jobs
2692
2693   @locking.ssynchronized(_LOCK)
2694   @_RequireOpenQueue
2695   def Shutdown(self):
2696     """Stops the job queue.
2697
2698     This shutdowns all the worker threads an closes the queue.
2699
2700     """
2701     self._wpool.TerminateWorkers()
2702
2703     self._queue_filelock.Close()
2704     self._queue_filelock = None