Implement virtual cluster support in Python code
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import time
35 import weakref
36 import threading
37 import itertools
38
39 try:
40   # pylint: disable=E0611
41   from pyinotify import pyinotify
42 except ImportError:
43   import pyinotify
44
45 from ganeti import asyncnotifier
46 from ganeti import constants
47 from ganeti import serializer
48 from ganeti import workerpool
49 from ganeti import locking
50 from ganeti import opcodes
51 from ganeti import errors
52 from ganeti import mcpu
53 from ganeti import utils
54 from ganeti import jstore
55 from ganeti import rpc
56 from ganeti import runtime
57 from ganeti import netutils
58 from ganeti import compat
59 from ganeti import ht
60 from ganeti import query
61 from ganeti import qlang
62 from ganeti import pathutils
63 from ganeti import vcluster
64
65
66 JOBQUEUE_THREADS = 25
67
68 # member lock names to be passed to @ssynchronized decorator
69 _LOCK = "_lock"
70 _QUEUE = "_queue"
71
72
73 class CancelJob(Exception):
74   """Special exception to cancel a job.
75
76   """
77
78
79 def TimeStampNow():
80   """Returns the current timestamp.
81
82   @rtype: tuple
83   @return: the current time in the (seconds, microseconds) format
84
85   """
86   return utils.SplitTime(time.time())
87
88
89 def _CallJqUpdate(runner, names, file_name, content):
90   """Updates job queue file after virtualizing filename.
91
92   """
93   virt_file_name = vcluster.MakeVirtualPath(file_name)
94   return runner.call_jobqueue_update(names, virt_file_name, content)
95
96
97 class _SimpleJobQuery:
98   """Wrapper for job queries.
99
100   Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
101
102   """
103   def __init__(self, fields):
104     """Initializes this class.
105
106     """
107     self._query = query.Query(query.JOB_FIELDS, fields)
108
109   def __call__(self, job):
110     """Executes a job query using cached field list.
111
112     """
113     return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
114
115
116 class _QueuedOpCode(object):
117   """Encapsulates an opcode object.
118
119   @ivar log: holds the execution log and consists of tuples
120   of the form C{(log_serial, timestamp, level, message)}
121   @ivar input: the OpCode we encapsulate
122   @ivar status: the current status
123   @ivar result: the result of the LU execution
124   @ivar start_timestamp: timestamp for the start of the execution
125   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
126   @ivar stop_timestamp: timestamp for the end of the execution
127
128   """
129   __slots__ = ["input", "status", "result", "log", "priority",
130                "start_timestamp", "exec_timestamp", "end_timestamp",
131                "__weakref__"]
132
133   def __init__(self, op):
134     """Initializes instances of this class.
135
136     @type op: L{opcodes.OpCode}
137     @param op: the opcode we encapsulate
138
139     """
140     self.input = op
141     self.status = constants.OP_STATUS_QUEUED
142     self.result = None
143     self.log = []
144     self.start_timestamp = None
145     self.exec_timestamp = None
146     self.end_timestamp = None
147
148     # Get initial priority (it might change during the lifetime of this opcode)
149     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
150
151   @classmethod
152   def Restore(cls, state):
153     """Restore the _QueuedOpCode from the serialized form.
154
155     @type state: dict
156     @param state: the serialized state
157     @rtype: _QueuedOpCode
158     @return: a new _QueuedOpCode instance
159
160     """
161     obj = _QueuedOpCode.__new__(cls)
162     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
163     obj.status = state["status"]
164     obj.result = state["result"]
165     obj.log = state["log"]
166     obj.start_timestamp = state.get("start_timestamp", None)
167     obj.exec_timestamp = state.get("exec_timestamp", None)
168     obj.end_timestamp = state.get("end_timestamp", None)
169     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
170     return obj
171
172   def Serialize(self):
173     """Serializes this _QueuedOpCode.
174
175     @rtype: dict
176     @return: the dictionary holding the serialized state
177
178     """
179     return {
180       "input": self.input.__getstate__(),
181       "status": self.status,
182       "result": self.result,
183       "log": self.log,
184       "start_timestamp": self.start_timestamp,
185       "exec_timestamp": self.exec_timestamp,
186       "end_timestamp": self.end_timestamp,
187       "priority": self.priority,
188       }
189
190
191 class _QueuedJob(object):
192   """In-memory job representation.
193
194   This is what we use to track the user-submitted jobs. Locking must
195   be taken care of by users of this class.
196
197   @type queue: L{JobQueue}
198   @ivar queue: the parent queue
199   @ivar id: the job ID
200   @type ops: list
201   @ivar ops: the list of _QueuedOpCode that constitute the job
202   @type log_serial: int
203   @ivar log_serial: holds the index for the next log entry
204   @ivar received_timestamp: the timestamp for when the job was received
205   @ivar start_timestmap: the timestamp for start of execution
206   @ivar end_timestamp: the timestamp for end of execution
207   @ivar writable: Whether the job is allowed to be modified
208
209   """
210   # pylint: disable=W0212
211   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
212                "received_timestamp", "start_timestamp", "end_timestamp",
213                "__weakref__", "processor_lock", "writable"]
214
215   def __init__(self, queue, job_id, ops, writable):
216     """Constructor for the _QueuedJob.
217
218     @type queue: L{JobQueue}
219     @param queue: our parent queue
220     @type job_id: job_id
221     @param job_id: our job id
222     @type ops: list
223     @param ops: the list of opcodes we hold, which will be encapsulated
224         in _QueuedOpCodes
225     @type writable: bool
226     @param writable: Whether job can be modified
227
228     """
229     if not ops:
230       raise errors.GenericError("A job needs at least one opcode")
231
232     self.queue = queue
233     self.id = int(job_id)
234     self.ops = [_QueuedOpCode(op) for op in ops]
235     self.log_serial = 0
236     self.received_timestamp = TimeStampNow()
237     self.start_timestamp = None
238     self.end_timestamp = None
239
240     self._InitInMemory(self, writable)
241
242   @staticmethod
243   def _InitInMemory(obj, writable):
244     """Initializes in-memory variables.
245
246     """
247     obj.writable = writable
248     obj.ops_iter = None
249     obj.cur_opctx = None
250
251     # Read-only jobs are not processed and therefore don't need a lock
252     if writable:
253       obj.processor_lock = threading.Lock()
254     else:
255       obj.processor_lock = None
256
257   def __repr__(self):
258     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
259               "id=%s" % self.id,
260               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
261
262     return "<%s at %#x>" % (" ".join(status), id(self))
263
264   @classmethod
265   def Restore(cls, queue, state, writable):
266     """Restore a _QueuedJob from serialized state:
267
268     @type queue: L{JobQueue}
269     @param queue: to which queue the restored job belongs
270     @type state: dict
271     @param state: the serialized state
272     @type writable: bool
273     @param writable: Whether job can be modified
274     @rtype: _JobQueue
275     @return: the restored _JobQueue instance
276
277     """
278     obj = _QueuedJob.__new__(cls)
279     obj.queue = queue
280     obj.id = int(state["id"])
281     obj.received_timestamp = state.get("received_timestamp", None)
282     obj.start_timestamp = state.get("start_timestamp", None)
283     obj.end_timestamp = state.get("end_timestamp", None)
284
285     obj.ops = []
286     obj.log_serial = 0
287     for op_state in state["ops"]:
288       op = _QueuedOpCode.Restore(op_state)
289       for log_entry in op.log:
290         obj.log_serial = max(obj.log_serial, log_entry[0])
291       obj.ops.append(op)
292
293     cls._InitInMemory(obj, writable)
294
295     return obj
296
297   def Serialize(self):
298     """Serialize the _JobQueue instance.
299
300     @rtype: dict
301     @return: the serialized state
302
303     """
304     return {
305       "id": self.id,
306       "ops": [op.Serialize() for op in self.ops],
307       "start_timestamp": self.start_timestamp,
308       "end_timestamp": self.end_timestamp,
309       "received_timestamp": self.received_timestamp,
310       }
311
312   def CalcStatus(self):
313     """Compute the status of this job.
314
315     This function iterates over all the _QueuedOpCodes in the job and
316     based on their status, computes the job status.
317
318     The algorithm is:
319       - if we find a cancelled, or finished with error, the job
320         status will be the same
321       - otherwise, the last opcode with the status one of:
322           - waitlock
323           - canceling
324           - running
325
326         will determine the job status
327
328       - otherwise, it means either all opcodes are queued, or success,
329         and the job status will be the same
330
331     @return: the job status
332
333     """
334     status = constants.JOB_STATUS_QUEUED
335
336     all_success = True
337     for op in self.ops:
338       if op.status == constants.OP_STATUS_SUCCESS:
339         continue
340
341       all_success = False
342
343       if op.status == constants.OP_STATUS_QUEUED:
344         pass
345       elif op.status == constants.OP_STATUS_WAITING:
346         status = constants.JOB_STATUS_WAITING
347       elif op.status == constants.OP_STATUS_RUNNING:
348         status = constants.JOB_STATUS_RUNNING
349       elif op.status == constants.OP_STATUS_CANCELING:
350         status = constants.JOB_STATUS_CANCELING
351         break
352       elif op.status == constants.OP_STATUS_ERROR:
353         status = constants.JOB_STATUS_ERROR
354         # The whole job fails if one opcode failed
355         break
356       elif op.status == constants.OP_STATUS_CANCELED:
357         status = constants.OP_STATUS_CANCELED
358         break
359
360     if all_success:
361       status = constants.JOB_STATUS_SUCCESS
362
363     return status
364
365   def CalcPriority(self):
366     """Gets the current priority for this job.
367
368     Only unfinished opcodes are considered. When all are done, the default
369     priority is used.
370
371     @rtype: int
372
373     """
374     priorities = [op.priority for op in self.ops
375                   if op.status not in constants.OPS_FINALIZED]
376
377     if not priorities:
378       # All opcodes are done, assume default priority
379       return constants.OP_PRIO_DEFAULT
380
381     return min(priorities)
382
383   def GetLogEntries(self, newer_than):
384     """Selectively returns the log entries.
385
386     @type newer_than: None or int
387     @param newer_than: if this is None, return all log entries,
388         otherwise return only the log entries with serial higher
389         than this value
390     @rtype: list
391     @return: the list of the log entries selected
392
393     """
394     if newer_than is None:
395       serial = -1
396     else:
397       serial = newer_than
398
399     entries = []
400     for op in self.ops:
401       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
402
403     return entries
404
405   def GetInfo(self, fields):
406     """Returns information about a job.
407
408     @type fields: list
409     @param fields: names of fields to return
410     @rtype: list
411     @return: list with one element for each field
412     @raise errors.OpExecError: when an invalid field
413         has been passed
414
415     """
416     return _SimpleJobQuery(fields)(self)
417
418   def MarkUnfinishedOps(self, status, result):
419     """Mark unfinished opcodes with a given status and result.
420
421     This is an utility function for marking all running or waiting to
422     be run opcodes with a given status. Opcodes which are already
423     finalised are not changed.
424
425     @param status: a given opcode status
426     @param result: the opcode result
427
428     """
429     not_marked = True
430     for op in self.ops:
431       if op.status in constants.OPS_FINALIZED:
432         assert not_marked, "Finalized opcodes found after non-finalized ones"
433         continue
434       op.status = status
435       op.result = result
436       not_marked = False
437
438   def Finalize(self):
439     """Marks the job as finalized.
440
441     """
442     self.end_timestamp = TimeStampNow()
443
444   def Cancel(self):
445     """Marks job as canceled/-ing if possible.
446
447     @rtype: tuple; (bool, string)
448     @return: Boolean describing whether job was successfully canceled or marked
449       as canceling and a text message
450
451     """
452     status = self.CalcStatus()
453
454     if status == constants.JOB_STATUS_QUEUED:
455       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
456                              "Job canceled by request")
457       self.Finalize()
458       return (True, "Job %s canceled" % self.id)
459
460     elif status == constants.JOB_STATUS_WAITING:
461       # The worker will notice the new status and cancel the job
462       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
463       return (True, "Job %s will be canceled" % self.id)
464
465     else:
466       logging.debug("Job %s is no longer waiting in the queue", self.id)
467       return (False, "Job %s is no longer waiting in the queue" % self.id)
468
469
470 class _OpExecCallbacks(mcpu.OpExecCbBase):
471   def __init__(self, queue, job, op):
472     """Initializes this class.
473
474     @type queue: L{JobQueue}
475     @param queue: Job queue
476     @type job: L{_QueuedJob}
477     @param job: Job object
478     @type op: L{_QueuedOpCode}
479     @param op: OpCode
480
481     """
482     assert queue, "Queue is missing"
483     assert job, "Job is missing"
484     assert op, "Opcode is missing"
485
486     self._queue = queue
487     self._job = job
488     self._op = op
489
490   def _CheckCancel(self):
491     """Raises an exception to cancel the job if asked to.
492
493     """
494     # Cancel here if we were asked to
495     if self._op.status == constants.OP_STATUS_CANCELING:
496       logging.debug("Canceling opcode")
497       raise CancelJob()
498
499   @locking.ssynchronized(_QUEUE, shared=1)
500   def NotifyStart(self):
501     """Mark the opcode as running, not lock-waiting.
502
503     This is called from the mcpu code as a notifier function, when the LU is
504     finally about to start the Exec() method. Of course, to have end-user
505     visible results, the opcode must be initially (before calling into
506     Processor.ExecOpCode) set to OP_STATUS_WAITING.
507
508     """
509     assert self._op in self._job.ops
510     assert self._op.status in (constants.OP_STATUS_WAITING,
511                                constants.OP_STATUS_CANCELING)
512
513     # Cancel here if we were asked to
514     self._CheckCancel()
515
516     logging.debug("Opcode is now running")
517
518     self._op.status = constants.OP_STATUS_RUNNING
519     self._op.exec_timestamp = TimeStampNow()
520
521     # And finally replicate the job status
522     self._queue.UpdateJobUnlocked(self._job)
523
524   @locking.ssynchronized(_QUEUE, shared=1)
525   def _AppendFeedback(self, timestamp, log_type, log_msg):
526     """Internal feedback append function, with locks
527
528     """
529     self._job.log_serial += 1
530     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
531     self._queue.UpdateJobUnlocked(self._job, replicate=False)
532
533   def Feedback(self, *args):
534     """Append a log entry.
535
536     """
537     assert len(args) < 3
538
539     if len(args) == 1:
540       log_type = constants.ELOG_MESSAGE
541       log_msg = args[0]
542     else:
543       (log_type, log_msg) = args
544
545     # The time is split to make serialization easier and not lose
546     # precision.
547     timestamp = utils.SplitTime(time.time())
548     self._AppendFeedback(timestamp, log_type, log_msg)
549
550   def CheckCancel(self):
551     """Check whether job has been cancelled.
552
553     """
554     assert self._op.status in (constants.OP_STATUS_WAITING,
555                                constants.OP_STATUS_CANCELING)
556
557     # Cancel here if we were asked to
558     self._CheckCancel()
559
560   def SubmitManyJobs(self, jobs):
561     """Submits jobs for processing.
562
563     See L{JobQueue.SubmitManyJobs}.
564
565     """
566     # Locking is done in job queue
567     return self._queue.SubmitManyJobs(jobs)
568
569
570 class _JobChangesChecker(object):
571   def __init__(self, fields, prev_job_info, prev_log_serial):
572     """Initializes this class.
573
574     @type fields: list of strings
575     @param fields: Fields requested by LUXI client
576     @type prev_job_info: string
577     @param prev_job_info: previous job info, as passed by the LUXI client
578     @type prev_log_serial: string
579     @param prev_log_serial: previous job serial, as passed by the LUXI client
580
581     """
582     self._squery = _SimpleJobQuery(fields)
583     self._prev_job_info = prev_job_info
584     self._prev_log_serial = prev_log_serial
585
586   def __call__(self, job):
587     """Checks whether job has changed.
588
589     @type job: L{_QueuedJob}
590     @param job: Job object
591
592     """
593     assert not job.writable, "Expected read-only job"
594
595     status = job.CalcStatus()
596     job_info = self._squery(job)
597     log_entries = job.GetLogEntries(self._prev_log_serial)
598
599     # Serializing and deserializing data can cause type changes (e.g. from
600     # tuple to list) or precision loss. We're doing it here so that we get
601     # the same modifications as the data received from the client. Without
602     # this, the comparison afterwards might fail without the data being
603     # significantly different.
604     # TODO: we just deserialized from disk, investigate how to make sure that
605     # the job info and log entries are compatible to avoid this further step.
606     # TODO: Doing something like in testutils.py:UnifyValueType might be more
607     # efficient, though floats will be tricky
608     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
609     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
610
611     # Don't even try to wait if the job is no longer running, there will be
612     # no changes.
613     if (status not in (constants.JOB_STATUS_QUEUED,
614                        constants.JOB_STATUS_RUNNING,
615                        constants.JOB_STATUS_WAITING) or
616         job_info != self._prev_job_info or
617         (log_entries and self._prev_log_serial != log_entries[0][0])):
618       logging.debug("Job %s changed", job.id)
619       return (job_info, log_entries)
620
621     return None
622
623
624 class _JobFileChangesWaiter(object):
625   def __init__(self, filename):
626     """Initializes this class.
627
628     @type filename: string
629     @param filename: Path to job file
630     @raises errors.InotifyError: if the notifier cannot be setup
631
632     """
633     self._wm = pyinotify.WatchManager()
634     self._inotify_handler = \
635       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
636     self._notifier = \
637       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
638     try:
639       self._inotify_handler.enable()
640     except Exception:
641       # pyinotify doesn't close file descriptors automatically
642       self._notifier.stop()
643       raise
644
645   def _OnInotify(self, notifier_enabled):
646     """Callback for inotify.
647
648     """
649     if not notifier_enabled:
650       self._inotify_handler.enable()
651
652   def Wait(self, timeout):
653     """Waits for the job file to change.
654
655     @type timeout: float
656     @param timeout: Timeout in seconds
657     @return: Whether there have been events
658
659     """
660     assert timeout >= 0
661     have_events = self._notifier.check_events(timeout * 1000)
662     if have_events:
663       self._notifier.read_events()
664     self._notifier.process_events()
665     return have_events
666
667   def Close(self):
668     """Closes underlying notifier and its file descriptor.
669
670     """
671     self._notifier.stop()
672
673
674 class _JobChangesWaiter(object):
675   def __init__(self, filename):
676     """Initializes this class.
677
678     @type filename: string
679     @param filename: Path to job file
680
681     """
682     self._filewaiter = None
683     self._filename = filename
684
685   def Wait(self, timeout):
686     """Waits for a job to change.
687
688     @type timeout: float
689     @param timeout: Timeout in seconds
690     @return: Whether there have been events
691
692     """
693     if self._filewaiter:
694       return self._filewaiter.Wait(timeout)
695
696     # Lazy setup: Avoid inotify setup cost when job file has already changed.
697     # If this point is reached, return immediately and let caller check the job
698     # file again in case there were changes since the last check. This avoids a
699     # race condition.
700     self._filewaiter = _JobFileChangesWaiter(self._filename)
701
702     return True
703
704   def Close(self):
705     """Closes underlying waiter.
706
707     """
708     if self._filewaiter:
709       self._filewaiter.Close()
710
711
712 class _WaitForJobChangesHelper(object):
713   """Helper class using inotify to wait for changes in a job file.
714
715   This class takes a previous job status and serial, and alerts the client when
716   the current job status has changed.
717
718   """
719   @staticmethod
720   def _CheckForChanges(counter, job_load_fn, check_fn):
721     if counter.next() > 0:
722       # If this isn't the first check the job is given some more time to change
723       # again. This gives better performance for jobs generating many
724       # changes/messages.
725       time.sleep(0.1)
726
727     job = job_load_fn()
728     if not job:
729       raise errors.JobLost()
730
731     result = check_fn(job)
732     if result is None:
733       raise utils.RetryAgain()
734
735     return result
736
737   def __call__(self, filename, job_load_fn,
738                fields, prev_job_info, prev_log_serial, timeout):
739     """Waits for changes on a job.
740
741     @type filename: string
742     @param filename: File on which to wait for changes
743     @type job_load_fn: callable
744     @param job_load_fn: Function to load job
745     @type fields: list of strings
746     @param fields: Which fields to check for changes
747     @type prev_job_info: list or None
748     @param prev_job_info: Last job information returned
749     @type prev_log_serial: int
750     @param prev_log_serial: Last job message serial number
751     @type timeout: float
752     @param timeout: maximum time to wait in seconds
753
754     """
755     counter = itertools.count()
756     try:
757       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
758       waiter = _JobChangesWaiter(filename)
759       try:
760         return utils.Retry(compat.partial(self._CheckForChanges,
761                                           counter, job_load_fn, check_fn),
762                            utils.RETRY_REMAINING_TIME, timeout,
763                            wait_fn=waiter.Wait)
764       finally:
765         waiter.Close()
766     except (errors.InotifyError, errors.JobLost):
767       return None
768     except utils.RetryTimeout:
769       return constants.JOB_NOTCHANGED
770
771
772 def _EncodeOpError(err):
773   """Encodes an error which occurred while processing an opcode.
774
775   """
776   if isinstance(err, errors.GenericError):
777     to_encode = err
778   else:
779     to_encode = errors.OpExecError(str(err))
780
781   return errors.EncodeException(to_encode)
782
783
784 class _TimeoutStrategyWrapper:
785   def __init__(self, fn):
786     """Initializes this class.
787
788     """
789     self._fn = fn
790     self._next = None
791
792   def _Advance(self):
793     """Gets the next timeout if necessary.
794
795     """
796     if self._next is None:
797       self._next = self._fn()
798
799   def Peek(self):
800     """Returns the next timeout.
801
802     """
803     self._Advance()
804     return self._next
805
806   def Next(self):
807     """Returns the current timeout and advances the internal state.
808
809     """
810     self._Advance()
811     result = self._next
812     self._next = None
813     return result
814
815
816 class _OpExecContext:
817   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
818     """Initializes this class.
819
820     """
821     self.op = op
822     self.index = index
823     self.log_prefix = log_prefix
824     self.summary = op.input.Summary()
825
826     # Create local copy to modify
827     if getattr(op.input, opcodes.DEPEND_ATTR, None):
828       self.jobdeps = op.input.depends[:]
829     else:
830       self.jobdeps = None
831
832     self._timeout_strategy_factory = timeout_strategy_factory
833     self._ResetTimeoutStrategy()
834
835   def _ResetTimeoutStrategy(self):
836     """Creates a new timeout strategy.
837
838     """
839     self._timeout_strategy = \
840       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
841
842   def CheckPriorityIncrease(self):
843     """Checks whether priority can and should be increased.
844
845     Called when locks couldn't be acquired.
846
847     """
848     op = self.op
849
850     # Exhausted all retries and next round should not use blocking acquire
851     # for locks?
852     if (self._timeout_strategy.Peek() is None and
853         op.priority > constants.OP_PRIO_HIGHEST):
854       logging.debug("Increasing priority")
855       op.priority -= 1
856       self._ResetTimeoutStrategy()
857       return True
858
859     return False
860
861   def GetNextLockTimeout(self):
862     """Returns the next lock acquire timeout.
863
864     """
865     return self._timeout_strategy.Next()
866
867
868 class _JobProcessor(object):
869   (DEFER,
870    WAITDEP,
871    FINISHED) = range(1, 4)
872
873   def __init__(self, queue, opexec_fn, job,
874                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
875     """Initializes this class.
876
877     """
878     self.queue = queue
879     self.opexec_fn = opexec_fn
880     self.job = job
881     self._timeout_strategy_factory = _timeout_strategy_factory
882
883   @staticmethod
884   def _FindNextOpcode(job, timeout_strategy_factory):
885     """Locates the next opcode to run.
886
887     @type job: L{_QueuedJob}
888     @param job: Job object
889     @param timeout_strategy_factory: Callable to create new timeout strategy
890
891     """
892     # Create some sort of a cache to speed up locating next opcode for future
893     # lookups
894     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
895     # pending and one for processed ops.
896     if job.ops_iter is None:
897       job.ops_iter = enumerate(job.ops)
898
899     # Find next opcode to run
900     while True:
901       try:
902         (idx, op) = job.ops_iter.next()
903       except StopIteration:
904         raise errors.ProgrammerError("Called for a finished job")
905
906       if op.status == constants.OP_STATUS_RUNNING:
907         # Found an opcode already marked as running
908         raise errors.ProgrammerError("Called for job marked as running")
909
910       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
911                              timeout_strategy_factory)
912
913       if op.status not in constants.OPS_FINALIZED:
914         return opctx
915
916       # This is a job that was partially completed before master daemon
917       # shutdown, so it can be expected that some opcodes are already
918       # completed successfully (if any did error out, then the whole job
919       # should have been aborted and not resubmitted for processing).
920       logging.info("%s: opcode %s already processed, skipping",
921                    opctx.log_prefix, opctx.summary)
922
923   @staticmethod
924   def _MarkWaitlock(job, op):
925     """Marks an opcode as waiting for locks.
926
927     The job's start timestamp is also set if necessary.
928
929     @type job: L{_QueuedJob}
930     @param job: Job object
931     @type op: L{_QueuedOpCode}
932     @param op: Opcode object
933
934     """
935     assert op in job.ops
936     assert op.status in (constants.OP_STATUS_QUEUED,
937                          constants.OP_STATUS_WAITING)
938
939     update = False
940
941     op.result = None
942
943     if op.status == constants.OP_STATUS_QUEUED:
944       op.status = constants.OP_STATUS_WAITING
945       update = True
946
947     if op.start_timestamp is None:
948       op.start_timestamp = TimeStampNow()
949       update = True
950
951     if job.start_timestamp is None:
952       job.start_timestamp = op.start_timestamp
953       update = True
954
955     assert op.status == constants.OP_STATUS_WAITING
956
957     return update
958
959   @staticmethod
960   def _CheckDependencies(queue, job, opctx):
961     """Checks if an opcode has dependencies and if so, processes them.
962
963     @type queue: L{JobQueue}
964     @param queue: Queue object
965     @type job: L{_QueuedJob}
966     @param job: Job object
967     @type opctx: L{_OpExecContext}
968     @param opctx: Opcode execution context
969     @rtype: bool
970     @return: Whether opcode will be re-scheduled by dependency tracker
971
972     """
973     op = opctx.op
974
975     result = False
976
977     while opctx.jobdeps:
978       (dep_job_id, dep_status) = opctx.jobdeps[0]
979
980       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
981                                                           dep_status)
982       assert ht.TNonEmptyString(depmsg), "No dependency message"
983
984       logging.info("%s: %s", opctx.log_prefix, depmsg)
985
986       if depresult == _JobDependencyManager.CONTINUE:
987         # Remove dependency and continue
988         opctx.jobdeps.pop(0)
989
990       elif depresult == _JobDependencyManager.WAIT:
991         # Need to wait for notification, dependency tracker will re-add job
992         # to workerpool
993         result = True
994         break
995
996       elif depresult == _JobDependencyManager.CANCEL:
997         # Job was cancelled, cancel this job as well
998         job.Cancel()
999         assert op.status == constants.OP_STATUS_CANCELING
1000         break
1001
1002       elif depresult in (_JobDependencyManager.WRONGSTATUS,
1003                          _JobDependencyManager.ERROR):
1004         # Job failed or there was an error, this job must fail
1005         op.status = constants.OP_STATUS_ERROR
1006         op.result = _EncodeOpError(errors.OpExecError(depmsg))
1007         break
1008
1009       else:
1010         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1011                                      depresult)
1012
1013     return result
1014
1015   def _ExecOpCodeUnlocked(self, opctx):
1016     """Processes one opcode and returns the result.
1017
1018     """
1019     op = opctx.op
1020
1021     assert op.status == constants.OP_STATUS_WAITING
1022
1023     timeout = opctx.GetNextLockTimeout()
1024
1025     try:
1026       # Make sure not to hold queue lock while calling ExecOpCode
1027       result = self.opexec_fn(op.input,
1028                               _OpExecCallbacks(self.queue, self.job, op),
1029                               timeout=timeout, priority=op.priority)
1030     except mcpu.LockAcquireTimeout:
1031       assert timeout is not None, "Received timeout for blocking acquire"
1032       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1033
1034       assert op.status in (constants.OP_STATUS_WAITING,
1035                            constants.OP_STATUS_CANCELING)
1036
1037       # Was job cancelled while we were waiting for the lock?
1038       if op.status == constants.OP_STATUS_CANCELING:
1039         return (constants.OP_STATUS_CANCELING, None)
1040
1041       # Stay in waitlock while trying to re-acquire lock
1042       return (constants.OP_STATUS_WAITING, None)
1043     except CancelJob:
1044       logging.exception("%s: Canceling job", opctx.log_prefix)
1045       assert op.status == constants.OP_STATUS_CANCELING
1046       return (constants.OP_STATUS_CANCELING, None)
1047     except Exception, err: # pylint: disable=W0703
1048       logging.exception("%s: Caught exception in %s",
1049                         opctx.log_prefix, opctx.summary)
1050       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1051     else:
1052       logging.debug("%s: %s successful",
1053                     opctx.log_prefix, opctx.summary)
1054       return (constants.OP_STATUS_SUCCESS, result)
1055
1056   def __call__(self, _nextop_fn=None):
1057     """Continues execution of a job.
1058
1059     @param _nextop_fn: Callback function for tests
1060     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1061       be deferred and C{WAITDEP} if the dependency manager
1062       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1063
1064     """
1065     queue = self.queue
1066     job = self.job
1067
1068     logging.debug("Processing job %s", job.id)
1069
1070     queue.acquire(shared=1)
1071     try:
1072       opcount = len(job.ops)
1073
1074       assert job.writable, "Expected writable job"
1075
1076       # Don't do anything for finalized jobs
1077       if job.CalcStatus() in constants.JOBS_FINALIZED:
1078         return self.FINISHED
1079
1080       # Is a previous opcode still pending?
1081       if job.cur_opctx:
1082         opctx = job.cur_opctx
1083         job.cur_opctx = None
1084       else:
1085         if __debug__ and _nextop_fn:
1086           _nextop_fn()
1087         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1088
1089       op = opctx.op
1090
1091       # Consistency check
1092       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1093                                      constants.OP_STATUS_CANCELING)
1094                         for i in job.ops[opctx.index + 1:])
1095
1096       assert op.status in (constants.OP_STATUS_QUEUED,
1097                            constants.OP_STATUS_WAITING,
1098                            constants.OP_STATUS_CANCELING)
1099
1100       assert (op.priority <= constants.OP_PRIO_LOWEST and
1101               op.priority >= constants.OP_PRIO_HIGHEST)
1102
1103       waitjob = None
1104
1105       if op.status != constants.OP_STATUS_CANCELING:
1106         assert op.status in (constants.OP_STATUS_QUEUED,
1107                              constants.OP_STATUS_WAITING)
1108
1109         # Prepare to start opcode
1110         if self._MarkWaitlock(job, op):
1111           # Write to disk
1112           queue.UpdateJobUnlocked(job)
1113
1114         assert op.status == constants.OP_STATUS_WAITING
1115         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1116         assert job.start_timestamp and op.start_timestamp
1117         assert waitjob is None
1118
1119         # Check if waiting for a job is necessary
1120         waitjob = self._CheckDependencies(queue, job, opctx)
1121
1122         assert op.status in (constants.OP_STATUS_WAITING,
1123                              constants.OP_STATUS_CANCELING,
1124                              constants.OP_STATUS_ERROR)
1125
1126         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1127                                          constants.OP_STATUS_ERROR)):
1128           logging.info("%s: opcode %s waiting for locks",
1129                        opctx.log_prefix, opctx.summary)
1130
1131           assert not opctx.jobdeps, "Not all dependencies were removed"
1132
1133           queue.release()
1134           try:
1135             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1136           finally:
1137             queue.acquire(shared=1)
1138
1139           op.status = op_status
1140           op.result = op_result
1141
1142           assert not waitjob
1143
1144         if op.status == constants.OP_STATUS_WAITING:
1145           # Couldn't get locks in time
1146           assert not op.end_timestamp
1147         else:
1148           # Finalize opcode
1149           op.end_timestamp = TimeStampNow()
1150
1151           if op.status == constants.OP_STATUS_CANCELING:
1152             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1153                                   for i in job.ops[opctx.index:])
1154           else:
1155             assert op.status in constants.OPS_FINALIZED
1156
1157       if op.status == constants.OP_STATUS_WAITING or waitjob:
1158         finalize = False
1159
1160         if not waitjob and opctx.CheckPriorityIncrease():
1161           # Priority was changed, need to update on-disk file
1162           queue.UpdateJobUnlocked(job)
1163
1164         # Keep around for another round
1165         job.cur_opctx = opctx
1166
1167         assert (op.priority <= constants.OP_PRIO_LOWEST and
1168                 op.priority >= constants.OP_PRIO_HIGHEST)
1169
1170         # In no case must the status be finalized here
1171         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1172
1173       else:
1174         # Ensure all opcodes so far have been successful
1175         assert (opctx.index == 0 or
1176                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1177                            for i in job.ops[:opctx.index]))
1178
1179         # Reset context
1180         job.cur_opctx = None
1181
1182         if op.status == constants.OP_STATUS_SUCCESS:
1183           finalize = False
1184
1185         elif op.status == constants.OP_STATUS_ERROR:
1186           # Ensure failed opcode has an exception as its result
1187           assert errors.GetEncodedError(job.ops[opctx.index].result)
1188
1189           to_encode = errors.OpExecError("Preceding opcode failed")
1190           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1191                                 _EncodeOpError(to_encode))
1192           finalize = True
1193
1194           # Consistency check
1195           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1196                             errors.GetEncodedError(i.result)
1197                             for i in job.ops[opctx.index:])
1198
1199         elif op.status == constants.OP_STATUS_CANCELING:
1200           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1201                                 "Job canceled by request")
1202           finalize = True
1203
1204         else:
1205           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1206
1207         if opctx.index == (opcount - 1):
1208           # Finalize on last opcode
1209           finalize = True
1210
1211         if finalize:
1212           # All opcodes have been run, finalize job
1213           job.Finalize()
1214
1215         # Write to disk. If the job status is final, this is the final write
1216         # allowed. Once the file has been written, it can be archived anytime.
1217         queue.UpdateJobUnlocked(job)
1218
1219         assert not waitjob
1220
1221         if finalize:
1222           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1223           return self.FINISHED
1224
1225       assert not waitjob or queue.depmgr.JobWaiting(job)
1226
1227       if waitjob:
1228         return self.WAITDEP
1229       else:
1230         return self.DEFER
1231     finally:
1232       assert job.writable, "Job became read-only while being processed"
1233       queue.release()
1234
1235
1236 def _EvaluateJobProcessorResult(depmgr, job, result):
1237   """Looks at a result from L{_JobProcessor} for a job.
1238
1239   To be used in a L{_JobQueueWorker}.
1240
1241   """
1242   if result == _JobProcessor.FINISHED:
1243     # Notify waiting jobs
1244     depmgr.NotifyWaiters(job.id)
1245
1246   elif result == _JobProcessor.DEFER:
1247     # Schedule again
1248     raise workerpool.DeferTask(priority=job.CalcPriority())
1249
1250   elif result == _JobProcessor.WAITDEP:
1251     # No-op, dependency manager will re-schedule
1252     pass
1253
1254   else:
1255     raise errors.ProgrammerError("Job processor returned unknown status %s" %
1256                                  (result, ))
1257
1258
1259 class _JobQueueWorker(workerpool.BaseWorker):
1260   """The actual job workers.
1261
1262   """
1263   def RunTask(self, job): # pylint: disable=W0221
1264     """Job executor.
1265
1266     @type job: L{_QueuedJob}
1267     @param job: the job to be processed
1268
1269     """
1270     assert job.writable, "Expected writable job"
1271
1272     # Ensure only one worker is active on a single job. If a job registers for
1273     # a dependency job, and the other job notifies before the first worker is
1274     # done, the job can end up in the tasklist more than once.
1275     job.processor_lock.acquire()
1276     try:
1277       return self._RunTaskInner(job)
1278     finally:
1279       job.processor_lock.release()
1280
1281   def _RunTaskInner(self, job):
1282     """Executes a job.
1283
1284     Must be called with per-job lock acquired.
1285
1286     """
1287     queue = job.queue
1288     assert queue == self.pool.queue
1289
1290     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1291     setname_fn(None)
1292
1293     proc = mcpu.Processor(queue.context, job.id)
1294
1295     # Create wrapper for setting thread name
1296     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1297                                     proc.ExecOpCode)
1298
1299     _EvaluateJobProcessorResult(queue.depmgr, job,
1300                                 _JobProcessor(queue, wrap_execop_fn, job)())
1301
1302   @staticmethod
1303   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1304     """Updates the worker thread name to include a short summary of the opcode.
1305
1306     @param setname_fn: Callable setting worker thread name
1307     @param execop_fn: Callable for executing opcode (usually
1308                       L{mcpu.Processor.ExecOpCode})
1309
1310     """
1311     setname_fn(op)
1312     try:
1313       return execop_fn(op, *args, **kwargs)
1314     finally:
1315       setname_fn(None)
1316
1317   @staticmethod
1318   def _GetWorkerName(job, op):
1319     """Sets the worker thread name.
1320
1321     @type job: L{_QueuedJob}
1322     @type op: L{opcodes.OpCode}
1323
1324     """
1325     parts = ["Job%s" % job.id]
1326
1327     if op:
1328       parts.append(op.TinySummary())
1329
1330     return "/".join(parts)
1331
1332
1333 class _JobQueueWorkerPool(workerpool.WorkerPool):
1334   """Simple class implementing a job-processing workerpool.
1335
1336   """
1337   def __init__(self, queue):
1338     super(_JobQueueWorkerPool, self).__init__("Jq",
1339                                               JOBQUEUE_THREADS,
1340                                               _JobQueueWorker)
1341     self.queue = queue
1342
1343
1344 class _JobDependencyManager:
1345   """Keeps track of job dependencies.
1346
1347   """
1348   (WAIT,
1349    ERROR,
1350    CANCEL,
1351    CONTINUE,
1352    WRONGSTATUS) = range(1, 6)
1353
1354   def __init__(self, getstatus_fn, enqueue_fn):
1355     """Initializes this class.
1356
1357     """
1358     self._getstatus_fn = getstatus_fn
1359     self._enqueue_fn = enqueue_fn
1360
1361     self._waiters = {}
1362     self._lock = locking.SharedLock("JobDepMgr")
1363
1364   @locking.ssynchronized(_LOCK, shared=1)
1365   def GetLockInfo(self, requested): # pylint: disable=W0613
1366     """Retrieves information about waiting jobs.
1367
1368     @type requested: set
1369     @param requested: Requested information, see C{query.LQ_*}
1370
1371     """
1372     # No need to sort here, that's being done by the lock manager and query
1373     # library. There are no priorities for notifying jobs, hence all show up as
1374     # one item under "pending".
1375     return [("job/%s" % job_id, None, None,
1376              [("job", [job.id for job in waiters])])
1377             for job_id, waiters in self._waiters.items()
1378             if waiters]
1379
1380   @locking.ssynchronized(_LOCK, shared=1)
1381   def JobWaiting(self, job):
1382     """Checks if a job is waiting.
1383
1384     """
1385     return compat.any(job in jobs
1386                       for jobs in self._waiters.values())
1387
1388   @locking.ssynchronized(_LOCK)
1389   def CheckAndRegister(self, job, dep_job_id, dep_status):
1390     """Checks if a dependency job has the requested status.
1391
1392     If the other job is not yet in a finalized status, the calling job will be
1393     notified (re-added to the workerpool) at a later point.
1394
1395     @type job: L{_QueuedJob}
1396     @param job: Job object
1397     @type dep_job_id: int
1398     @param dep_job_id: ID of dependency job
1399     @type dep_status: list
1400     @param dep_status: Required status
1401
1402     """
1403     assert ht.TJobId(job.id)
1404     assert ht.TJobId(dep_job_id)
1405     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1406
1407     if job.id == dep_job_id:
1408       return (self.ERROR, "Job can't depend on itself")
1409
1410     # Get status of dependency job
1411     try:
1412       status = self._getstatus_fn(dep_job_id)
1413     except errors.JobLost, err:
1414       return (self.ERROR, "Dependency error: %s" % err)
1415
1416     assert status in constants.JOB_STATUS_ALL
1417
1418     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1419
1420     if status not in constants.JOBS_FINALIZED:
1421       # Register for notification and wait for job to finish
1422       job_id_waiters.add(job)
1423       return (self.WAIT,
1424               "Need to wait for job %s, wanted status '%s'" %
1425               (dep_job_id, dep_status))
1426
1427     # Remove from waiters list
1428     if job in job_id_waiters:
1429       job_id_waiters.remove(job)
1430
1431     if (status == constants.JOB_STATUS_CANCELED and
1432         constants.JOB_STATUS_CANCELED not in dep_status):
1433       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1434
1435     elif not dep_status or status in dep_status:
1436       return (self.CONTINUE,
1437               "Dependency job %s finished with status '%s'" %
1438               (dep_job_id, status))
1439
1440     else:
1441       return (self.WRONGSTATUS,
1442               "Dependency job %s finished with status '%s',"
1443               " not one of '%s' as required" %
1444               (dep_job_id, status, utils.CommaJoin(dep_status)))
1445
1446   def _RemoveEmptyWaitersUnlocked(self):
1447     """Remove all jobs without actual waiters.
1448
1449     """
1450     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1451                    if not waiters]:
1452       del self._waiters[job_id]
1453
1454   def NotifyWaiters(self, job_id):
1455     """Notifies all jobs waiting for a certain job ID.
1456
1457     @attention: Do not call until L{CheckAndRegister} returned a status other
1458       than C{WAITDEP} for C{job_id}, or behaviour is undefined
1459     @type job_id: int
1460     @param job_id: Job ID
1461
1462     """
1463     assert ht.TJobId(job_id)
1464
1465     self._lock.acquire()
1466     try:
1467       self._RemoveEmptyWaitersUnlocked()
1468
1469       jobs = self._waiters.pop(job_id, None)
1470     finally:
1471       self._lock.release()
1472
1473     if jobs:
1474       # Re-add jobs to workerpool
1475       logging.debug("Re-adding %s jobs which were waiting for job %s",
1476                     len(jobs), job_id)
1477       self._enqueue_fn(jobs)
1478
1479
1480 def _RequireOpenQueue(fn):
1481   """Decorator for "public" functions.
1482
1483   This function should be used for all 'public' functions. That is,
1484   functions usually called from other classes. Note that this should
1485   be applied only to methods (not plain functions), since it expects
1486   that the decorated function is called with a first argument that has
1487   a '_queue_filelock' argument.
1488
1489   @warning: Use this decorator only after locking.ssynchronized
1490
1491   Example::
1492     @locking.ssynchronized(_LOCK)
1493     @_RequireOpenQueue
1494     def Example(self):
1495       pass
1496
1497   """
1498   def wrapper(self, *args, **kwargs):
1499     # pylint: disable=W0212
1500     assert self._queue_filelock is not None, "Queue should be open"
1501     return fn(self, *args, **kwargs)
1502   return wrapper
1503
1504
1505 def _RequireNonDrainedQueue(fn):
1506   """Decorator checking for a non-drained queue.
1507
1508   To be used with functions submitting new jobs.
1509
1510   """
1511   def wrapper(self, *args, **kwargs):
1512     """Wrapper function.
1513
1514     @raise errors.JobQueueDrainError: if the job queue is marked for draining
1515
1516     """
1517     # Ok when sharing the big job queue lock, as the drain file is created when
1518     # the lock is exclusive.
1519     # Needs access to protected member, pylint: disable=W0212
1520     if self._drained:
1521       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1522
1523     if not self._accepting_jobs:
1524       raise errors.JobQueueError("Job queue is shutting down, refusing job")
1525
1526     return fn(self, *args, **kwargs)
1527   return wrapper
1528
1529
1530 class JobQueue(object):
1531   """Queue used to manage the jobs.
1532
1533   """
1534   def __init__(self, context):
1535     """Constructor for JobQueue.
1536
1537     The constructor will initialize the job queue object and then
1538     start loading the current jobs from disk, either for starting them
1539     (if they were queue) or for aborting them (if they were already
1540     running).
1541
1542     @type context: GanetiContext
1543     @param context: the context object for access to the configuration
1544         data and other ganeti objects
1545
1546     """
1547     self.context = context
1548     self._memcache = weakref.WeakValueDictionary()
1549     self._my_hostname = netutils.Hostname.GetSysName()
1550
1551     # The Big JobQueue lock. If a code block or method acquires it in shared
1552     # mode safe it must guarantee concurrency with all the code acquiring it in
1553     # shared mode, including itself. In order not to acquire it at all
1554     # concurrency must be guaranteed with all code acquiring it in shared mode
1555     # and all code acquiring it exclusively.
1556     self._lock = locking.SharedLock("JobQueue")
1557
1558     self.acquire = self._lock.acquire
1559     self.release = self._lock.release
1560
1561     # Accept jobs by default
1562     self._accepting_jobs = True
1563
1564     # Initialize the queue, and acquire the filelock.
1565     # This ensures no other process is working on the job queue.
1566     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1567
1568     # Read serial file
1569     self._last_serial = jstore.ReadSerial()
1570     assert self._last_serial is not None, ("Serial file was modified between"
1571                                            " check in jstore and here")
1572
1573     # Get initial list of nodes
1574     self._nodes = dict((n.name, n.primary_ip)
1575                        for n in self.context.cfg.GetAllNodesInfo().values()
1576                        if n.master_candidate)
1577
1578     # Remove master node
1579     self._nodes.pop(self._my_hostname, None)
1580
1581     # TODO: Check consistency across nodes
1582
1583     self._queue_size = None
1584     self._UpdateQueueSizeUnlocked()
1585     assert ht.TInt(self._queue_size)
1586     self._drained = jstore.CheckDrainFlag()
1587
1588     # Job dependencies
1589     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1590                                         self._EnqueueJobs)
1591     self.context.glm.AddToLockMonitor(self.depmgr)
1592
1593     # Setup worker pool
1594     self._wpool = _JobQueueWorkerPool(self)
1595     try:
1596       self._InspectQueue()
1597     except:
1598       self._wpool.TerminateWorkers()
1599       raise
1600
1601   @locking.ssynchronized(_LOCK)
1602   @_RequireOpenQueue
1603   def _InspectQueue(self):
1604     """Loads the whole job queue and resumes unfinished jobs.
1605
1606     This function needs the lock here because WorkerPool.AddTask() may start a
1607     job while we're still doing our work.
1608
1609     """
1610     logging.info("Inspecting job queue")
1611
1612     restartjobs = []
1613
1614     all_job_ids = self._GetJobIDsUnlocked()
1615     jobs_count = len(all_job_ids)
1616     lastinfo = time.time()
1617     for idx, job_id in enumerate(all_job_ids):
1618       # Give an update every 1000 jobs or 10 seconds
1619       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1620           idx == (jobs_count - 1)):
1621         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1622                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1623         lastinfo = time.time()
1624
1625       job = self._LoadJobUnlocked(job_id)
1626
1627       # a failure in loading the job can cause 'None' to be returned
1628       if job is None:
1629         continue
1630
1631       status = job.CalcStatus()
1632
1633       if status == constants.JOB_STATUS_QUEUED:
1634         restartjobs.append(job)
1635
1636       elif status in (constants.JOB_STATUS_RUNNING,
1637                       constants.JOB_STATUS_WAITING,
1638                       constants.JOB_STATUS_CANCELING):
1639         logging.warning("Unfinished job %s found: %s", job.id, job)
1640
1641         if status == constants.JOB_STATUS_WAITING:
1642           # Restart job
1643           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1644           restartjobs.append(job)
1645         else:
1646           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1647                                 "Unclean master daemon shutdown")
1648           job.Finalize()
1649
1650         self.UpdateJobUnlocked(job)
1651
1652     if restartjobs:
1653       logging.info("Restarting %s jobs", len(restartjobs))
1654       self._EnqueueJobsUnlocked(restartjobs)
1655
1656     logging.info("Job queue inspection finished")
1657
1658   def _GetRpc(self, address_list):
1659     """Gets RPC runner with context.
1660
1661     """
1662     return rpc.JobQueueRunner(self.context, address_list)
1663
1664   @locking.ssynchronized(_LOCK)
1665   @_RequireOpenQueue
1666   def AddNode(self, node):
1667     """Register a new node with the queue.
1668
1669     @type node: L{objects.Node}
1670     @param node: the node object to be added
1671
1672     """
1673     node_name = node.name
1674     assert node_name != self._my_hostname
1675
1676     # Clean queue directory on added node
1677     result = self._GetRpc(None).call_jobqueue_purge(node_name)
1678     msg = result.fail_msg
1679     if msg:
1680       logging.warning("Cannot cleanup queue directory on node %s: %s",
1681                       node_name, msg)
1682
1683     if not node.master_candidate:
1684       # remove if existing, ignoring errors
1685       self._nodes.pop(node_name, None)
1686       # and skip the replication of the job ids
1687       return
1688
1689     # Upload the whole queue excluding archived jobs
1690     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1691
1692     # Upload current serial file
1693     files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1694
1695     # Static address list
1696     addrs = [node.primary_ip]
1697
1698     for file_name in files:
1699       # Read file content
1700       content = utils.ReadFile(file_name)
1701
1702       result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1703                              file_name, content)
1704       msg = result[node_name].fail_msg
1705       if msg:
1706         logging.error("Failed to upload file %s to node %s: %s",
1707                       file_name, node_name, msg)
1708
1709     self._nodes[node_name] = node.primary_ip
1710
1711   @locking.ssynchronized(_LOCK)
1712   @_RequireOpenQueue
1713   def RemoveNode(self, node_name):
1714     """Callback called when removing nodes from the cluster.
1715
1716     @type node_name: str
1717     @param node_name: the name of the node to remove
1718
1719     """
1720     self._nodes.pop(node_name, None)
1721
1722   @staticmethod
1723   def _CheckRpcResult(result, nodes, failmsg):
1724     """Verifies the status of an RPC call.
1725
1726     Since we aim to keep consistency should this node (the current
1727     master) fail, we will log errors if our rpc fail, and especially
1728     log the case when more than half of the nodes fails.
1729
1730     @param result: the data as returned from the rpc call
1731     @type nodes: list
1732     @param nodes: the list of nodes we made the call to
1733     @type failmsg: str
1734     @param failmsg: the identifier to be used for logging
1735
1736     """
1737     failed = []
1738     success = []
1739
1740     for node in nodes:
1741       msg = result[node].fail_msg
1742       if msg:
1743         failed.append(node)
1744         logging.error("RPC call %s (%s) failed on node %s: %s",
1745                       result[node].call, failmsg, node, msg)
1746       else:
1747         success.append(node)
1748
1749     # +1 for the master node
1750     if (len(success) + 1) < len(failed):
1751       # TODO: Handle failing nodes
1752       logging.error("More than half of the nodes failed")
1753
1754   def _GetNodeIp(self):
1755     """Helper for returning the node name/ip list.
1756
1757     @rtype: (list, list)
1758     @return: a tuple of two lists, the first one with the node
1759         names and the second one with the node addresses
1760
1761     """
1762     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1763     name_list = self._nodes.keys()
1764     addr_list = [self._nodes[name] for name in name_list]
1765     return name_list, addr_list
1766
1767   def _UpdateJobQueueFile(self, file_name, data, replicate):
1768     """Writes a file locally and then replicates it to all nodes.
1769
1770     This function will replace the contents of a file on the local
1771     node and then replicate it to all the other nodes we have.
1772
1773     @type file_name: str
1774     @param file_name: the path of the file to be replicated
1775     @type data: str
1776     @param data: the new contents of the file
1777     @type replicate: boolean
1778     @param replicate: whether to spread the changes to the remote nodes
1779
1780     """
1781     getents = runtime.GetEnts()
1782     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1783                     gid=getents.masterd_gid)
1784
1785     if replicate:
1786       names, addrs = self._GetNodeIp()
1787       result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1788       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1789
1790   def _RenameFilesUnlocked(self, rename):
1791     """Renames a file locally and then replicate the change.
1792
1793     This function will rename a file in the local queue directory
1794     and then replicate this rename to all the other nodes we have.
1795
1796     @type rename: list of (old, new)
1797     @param rename: List containing tuples mapping old to new names
1798
1799     """
1800     # Rename them locally
1801     for old, new in rename:
1802       utils.RenameFile(old, new, mkdir=True)
1803
1804     # ... and on all nodes
1805     names, addrs = self._GetNodeIp()
1806     result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1807     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1808
1809   def _NewSerialsUnlocked(self, count):
1810     """Generates a new job identifier.
1811
1812     Job identifiers are unique during the lifetime of a cluster.
1813
1814     @type count: integer
1815     @param count: how many serials to return
1816     @rtype: list of int
1817     @return: a list of job identifiers.
1818
1819     """
1820     assert ht.TPositiveInt(count)
1821
1822     # New number
1823     serial = self._last_serial + count
1824
1825     # Write to file
1826     self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1827                              "%s\n" % serial, True)
1828
1829     result = [jstore.FormatJobID(v)
1830               for v in range(self._last_serial + 1, serial + 1)]
1831
1832     # Keep it only if we were able to write the file
1833     self._last_serial = serial
1834
1835     assert len(result) == count
1836
1837     return result
1838
1839   @staticmethod
1840   def _GetJobPath(job_id):
1841     """Returns the job file for a given job id.
1842
1843     @type job_id: str
1844     @param job_id: the job identifier
1845     @rtype: str
1846     @return: the path to the job file
1847
1848     """
1849     return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1850
1851   @staticmethod
1852   def _GetArchivedJobPath(job_id):
1853     """Returns the archived job file for a give job id.
1854
1855     @type job_id: str
1856     @param job_id: the job identifier
1857     @rtype: str
1858     @return: the path to the archived job file
1859
1860     """
1861     return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1862                           jstore.GetArchiveDirectory(job_id),
1863                           "job-%s" % job_id)
1864
1865   @staticmethod
1866   def _GetJobIDsUnlocked(sort=True):
1867     """Return all known job IDs.
1868
1869     The method only looks at disk because it's a requirement that all
1870     jobs are present on disk (so in the _memcache we don't have any
1871     extra IDs).
1872
1873     @type sort: boolean
1874     @param sort: perform sorting on the returned job ids
1875     @rtype: list
1876     @return: the list of job IDs
1877
1878     """
1879     jlist = []
1880     for filename in utils.ListVisibleFiles(pathutils.QUEUE_DIR):
1881       m = constants.JOB_FILE_RE.match(filename)
1882       if m:
1883         jlist.append(int(m.group(1)))
1884     if sort:
1885       jlist.sort()
1886     return jlist
1887
1888   def _LoadJobUnlocked(self, job_id):
1889     """Loads a job from the disk or memory.
1890
1891     Given a job id, this will return the cached job object if
1892     existing, or try to load the job from the disk. If loading from
1893     disk, it will also add the job to the cache.
1894
1895     @type job_id: int
1896     @param job_id: the job id
1897     @rtype: L{_QueuedJob} or None
1898     @return: either None or the job object
1899
1900     """
1901     job = self._memcache.get(job_id, None)
1902     if job:
1903       logging.debug("Found job %s in memcache", job_id)
1904       assert job.writable, "Found read-only job in memcache"
1905       return job
1906
1907     try:
1908       job = self._LoadJobFromDisk(job_id, False)
1909       if job is None:
1910         return job
1911     except errors.JobFileCorrupted:
1912       old_path = self._GetJobPath(job_id)
1913       new_path = self._GetArchivedJobPath(job_id)
1914       if old_path == new_path:
1915         # job already archived (future case)
1916         logging.exception("Can't parse job %s", job_id)
1917       else:
1918         # non-archived case
1919         logging.exception("Can't parse job %s, will archive.", job_id)
1920         self._RenameFilesUnlocked([(old_path, new_path)])
1921       return None
1922
1923     assert job.writable, "Job just loaded is not writable"
1924
1925     self._memcache[job_id] = job
1926     logging.debug("Added job %s to the cache", job_id)
1927     return job
1928
1929   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1930     """Load the given job file from disk.
1931
1932     Given a job file, read, load and restore it in a _QueuedJob format.
1933
1934     @type job_id: int
1935     @param job_id: job identifier
1936     @type try_archived: bool
1937     @param try_archived: Whether to try loading an archived job
1938     @rtype: L{_QueuedJob} or None
1939     @return: either None or the job object
1940
1941     """
1942     path_functions = [(self._GetJobPath, True)]
1943
1944     if try_archived:
1945       path_functions.append((self._GetArchivedJobPath, False))
1946
1947     raw_data = None
1948     writable_default = None
1949
1950     for (fn, writable_default) in path_functions:
1951       filepath = fn(job_id)
1952       logging.debug("Loading job from %s", filepath)
1953       try:
1954         raw_data = utils.ReadFile(filepath)
1955       except EnvironmentError, err:
1956         if err.errno != errno.ENOENT:
1957           raise
1958       else:
1959         break
1960
1961     if not raw_data:
1962       return None
1963
1964     if writable is None:
1965       writable = writable_default
1966
1967     try:
1968       data = serializer.LoadJson(raw_data)
1969       job = _QueuedJob.Restore(self, data, writable)
1970     except Exception, err: # pylint: disable=W0703
1971       raise errors.JobFileCorrupted(err)
1972
1973     return job
1974
1975   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1976     """Load the given job file from disk.
1977
1978     Given a job file, read, load and restore it in a _QueuedJob format.
1979     In case of error reading the job, it gets returned as None, and the
1980     exception is logged.
1981
1982     @type job_id: int
1983     @param job_id: job identifier
1984     @type try_archived: bool
1985     @param try_archived: Whether to try loading an archived job
1986     @rtype: L{_QueuedJob} or None
1987     @return: either None or the job object
1988
1989     """
1990     try:
1991       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1992     except (errors.JobFileCorrupted, EnvironmentError):
1993       logging.exception("Can't load/parse job %s", job_id)
1994       return None
1995
1996   def _UpdateQueueSizeUnlocked(self):
1997     """Update the queue size.
1998
1999     """
2000     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
2001
2002   @locking.ssynchronized(_LOCK)
2003   @_RequireOpenQueue
2004   def SetDrainFlag(self, drain_flag):
2005     """Sets the drain flag for the queue.
2006
2007     @type drain_flag: boolean
2008     @param drain_flag: Whether to set or unset the drain flag
2009
2010     """
2011     jstore.SetDrainFlag(drain_flag)
2012
2013     self._drained = drain_flag
2014
2015     return True
2016
2017   @_RequireOpenQueue
2018   def _SubmitJobUnlocked(self, job_id, ops):
2019     """Create and store a new job.
2020
2021     This enters the job into our job queue and also puts it on the new
2022     queue, in order for it to be picked up by the queue processors.
2023
2024     @type job_id: job ID
2025     @param job_id: the job ID for the new job
2026     @type ops: list
2027     @param ops: The list of OpCodes that will become the new job.
2028     @rtype: L{_QueuedJob}
2029     @return: the job object to be queued
2030     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2031     @raise errors.GenericError: If an opcode is not valid
2032
2033     """
2034     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2035       raise errors.JobQueueFull()
2036
2037     job = _QueuedJob(self, job_id, ops, True)
2038
2039     # Check priority
2040     for idx, op in enumerate(job.ops):
2041       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2042         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2043         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2044                                   " are %s" % (idx, op.priority, allowed))
2045
2046       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2047       if not opcodes.TNoRelativeJobDependencies(dependencies):
2048         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2049                                   " match %s: %s" %
2050                                   (idx, opcodes.TNoRelativeJobDependencies,
2051                                    dependencies))
2052
2053     # Write to disk
2054     self.UpdateJobUnlocked(job)
2055
2056     self._queue_size += 1
2057
2058     logging.debug("Adding new job %s to the cache", job_id)
2059     self._memcache[job_id] = job
2060
2061     return job
2062
2063   @locking.ssynchronized(_LOCK)
2064   @_RequireOpenQueue
2065   @_RequireNonDrainedQueue
2066   def SubmitJob(self, ops):
2067     """Create and store a new job.
2068
2069     @see: L{_SubmitJobUnlocked}
2070
2071     """
2072     (job_id, ) = self._NewSerialsUnlocked(1)
2073     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2074     return job_id
2075
2076   @locking.ssynchronized(_LOCK)
2077   @_RequireOpenQueue
2078   @_RequireNonDrainedQueue
2079   def SubmitManyJobs(self, jobs):
2080     """Create and store multiple jobs.
2081
2082     @see: L{_SubmitJobUnlocked}
2083
2084     """
2085     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2086
2087     (results, added_jobs) = \
2088       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2089
2090     self._EnqueueJobsUnlocked(added_jobs)
2091
2092     return results
2093
2094   @staticmethod
2095   def _FormatSubmitError(msg, ops):
2096     """Formats errors which occurred while submitting a job.
2097
2098     """
2099     return ("%s; opcodes %s" %
2100             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2101
2102   @staticmethod
2103   def _ResolveJobDependencies(resolve_fn, deps):
2104     """Resolves relative job IDs in dependencies.
2105
2106     @type resolve_fn: callable
2107     @param resolve_fn: Function to resolve a relative job ID
2108     @type deps: list
2109     @param deps: Dependencies
2110     @rtype: list
2111     @return: Resolved dependencies
2112
2113     """
2114     result = []
2115
2116     for (dep_job_id, dep_status) in deps:
2117       if ht.TRelativeJobId(dep_job_id):
2118         assert ht.TInt(dep_job_id) and dep_job_id < 0
2119         try:
2120           job_id = resolve_fn(dep_job_id)
2121         except IndexError:
2122           # Abort
2123           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2124       else:
2125         job_id = dep_job_id
2126
2127       result.append((job_id, dep_status))
2128
2129     return (True, result)
2130
2131   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2132     """Create and store multiple jobs.
2133
2134     @see: L{_SubmitJobUnlocked}
2135
2136     """
2137     results = []
2138     added_jobs = []
2139
2140     def resolve_fn(job_idx, reljobid):
2141       assert reljobid < 0
2142       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2143
2144     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2145       for op in ops:
2146         if getattr(op, opcodes.DEPEND_ATTR, None):
2147           (status, data) = \
2148             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2149                                          op.depends)
2150           if not status:
2151             # Abort resolving dependencies
2152             assert ht.TNonEmptyString(data), "No error message"
2153             break
2154           # Use resolved dependencies
2155           op.depends = data
2156       else:
2157         try:
2158           job = self._SubmitJobUnlocked(job_id, ops)
2159         except errors.GenericError, err:
2160           status = False
2161           data = self._FormatSubmitError(str(err), ops)
2162         else:
2163           status = True
2164           data = job_id
2165           added_jobs.append(job)
2166
2167       results.append((status, data))
2168
2169     return (results, added_jobs)
2170
2171   @locking.ssynchronized(_LOCK)
2172   def _EnqueueJobs(self, jobs):
2173     """Helper function to add jobs to worker pool's queue.
2174
2175     @type jobs: list
2176     @param jobs: List of all jobs
2177
2178     """
2179     return self._EnqueueJobsUnlocked(jobs)
2180
2181   def _EnqueueJobsUnlocked(self, jobs):
2182     """Helper function to add jobs to worker pool's queue.
2183
2184     @type jobs: list
2185     @param jobs: List of all jobs
2186
2187     """
2188     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2189     self._wpool.AddManyTasks([(job, ) for job in jobs],
2190                              priority=[job.CalcPriority() for job in jobs])
2191
2192   def _GetJobStatusForDependencies(self, job_id):
2193     """Gets the status of a job for dependencies.
2194
2195     @type job_id: int
2196     @param job_id: Job ID
2197     @raise errors.JobLost: If job can't be found
2198
2199     """
2200     # Not using in-memory cache as doing so would require an exclusive lock
2201
2202     # Try to load from disk
2203     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2204
2205     assert not job.writable, "Got writable job" # pylint: disable=E1101
2206
2207     if job:
2208       return job.CalcStatus()
2209
2210     raise errors.JobLost("Job %s not found" % job_id)
2211
2212   @_RequireOpenQueue
2213   def UpdateJobUnlocked(self, job, replicate=True):
2214     """Update a job's on disk storage.
2215
2216     After a job has been modified, this function needs to be called in
2217     order to write the changes to disk and replicate them to the other
2218     nodes.
2219
2220     @type job: L{_QueuedJob}
2221     @param job: the changed job
2222     @type replicate: boolean
2223     @param replicate: whether to replicate the change to remote nodes
2224
2225     """
2226     if __debug__:
2227       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2228       assert (finalized ^ (job.end_timestamp is None))
2229       assert job.writable, "Can't update read-only job"
2230
2231     filename = self._GetJobPath(job.id)
2232     data = serializer.DumpJson(job.Serialize())
2233     logging.debug("Writing job %s to %s", job.id, filename)
2234     self._UpdateJobQueueFile(filename, data, replicate)
2235
2236   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2237                         timeout):
2238     """Waits for changes in a job.
2239
2240     @type job_id: int
2241     @param job_id: Job identifier
2242     @type fields: list of strings
2243     @param fields: Which fields to check for changes
2244     @type prev_job_info: list or None
2245     @param prev_job_info: Last job information returned
2246     @type prev_log_serial: int
2247     @param prev_log_serial: Last job message serial number
2248     @type timeout: float
2249     @param timeout: maximum time to wait in seconds
2250     @rtype: tuple (job info, log entries)
2251     @return: a tuple of the job information as required via
2252         the fields parameter, and the log entries as a list
2253
2254         if the job has not changed and the timeout has expired,
2255         we instead return a special value,
2256         L{constants.JOB_NOTCHANGED}, which should be interpreted
2257         as such by the clients
2258
2259     """
2260     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2261                              writable=False)
2262
2263     helper = _WaitForJobChangesHelper()
2264
2265     return helper(self._GetJobPath(job_id), load_fn,
2266                   fields, prev_job_info, prev_log_serial, timeout)
2267
2268   @locking.ssynchronized(_LOCK)
2269   @_RequireOpenQueue
2270   def CancelJob(self, job_id):
2271     """Cancels a job.
2272
2273     This will only succeed if the job has not started yet.
2274
2275     @type job_id: int
2276     @param job_id: job ID of job to be cancelled.
2277
2278     """
2279     logging.info("Cancelling job %s", job_id)
2280
2281     job = self._LoadJobUnlocked(job_id)
2282     if not job:
2283       logging.debug("Job %s not found", job_id)
2284       return (False, "Job %s not found" % job_id)
2285
2286     assert job.writable, "Can't cancel read-only job"
2287
2288     (success, msg) = job.Cancel()
2289
2290     if success:
2291       # If the job was finalized (e.g. cancelled), this is the final write
2292       # allowed. The job can be archived anytime.
2293       self.UpdateJobUnlocked(job)
2294
2295     return (success, msg)
2296
2297   @_RequireOpenQueue
2298   def _ArchiveJobsUnlocked(self, jobs):
2299     """Archives jobs.
2300
2301     @type jobs: list of L{_QueuedJob}
2302     @param jobs: Job objects
2303     @rtype: int
2304     @return: Number of archived jobs
2305
2306     """
2307     archive_jobs = []
2308     rename_files = []
2309     for job in jobs:
2310       assert job.writable, "Can't archive read-only job"
2311
2312       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2313         logging.debug("Job %s is not yet done", job.id)
2314         continue
2315
2316       archive_jobs.append(job)
2317
2318       old = self._GetJobPath(job.id)
2319       new = self._GetArchivedJobPath(job.id)
2320       rename_files.append((old, new))
2321
2322     # TODO: What if 1..n files fail to rename?
2323     self._RenameFilesUnlocked(rename_files)
2324
2325     logging.debug("Successfully archived job(s) %s",
2326                   utils.CommaJoin(job.id for job in archive_jobs))
2327
2328     # Since we haven't quite checked, above, if we succeeded or failed renaming
2329     # the files, we update the cached queue size from the filesystem. When we
2330     # get around to fix the TODO: above, we can use the number of actually
2331     # archived jobs to fix this.
2332     self._UpdateQueueSizeUnlocked()
2333     return len(archive_jobs)
2334
2335   @locking.ssynchronized(_LOCK)
2336   @_RequireOpenQueue
2337   def ArchiveJob(self, job_id):
2338     """Archives a job.
2339
2340     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2341
2342     @type job_id: int
2343     @param job_id: Job ID of job to be archived.
2344     @rtype: bool
2345     @return: Whether job was archived
2346
2347     """
2348     logging.info("Archiving job %s", job_id)
2349
2350     job = self._LoadJobUnlocked(job_id)
2351     if not job:
2352       logging.debug("Job %s not found", job_id)
2353       return False
2354
2355     return self._ArchiveJobsUnlocked([job]) == 1
2356
2357   @locking.ssynchronized(_LOCK)
2358   @_RequireOpenQueue
2359   def AutoArchiveJobs(self, age, timeout):
2360     """Archives all jobs based on age.
2361
2362     The method will archive all jobs which are older than the age
2363     parameter. For jobs that don't have an end timestamp, the start
2364     timestamp will be considered. The special '-1' age will cause
2365     archival of all jobs (that are not running or queued).
2366
2367     @type age: int
2368     @param age: the minimum age in seconds
2369
2370     """
2371     logging.info("Archiving jobs with age more than %s seconds", age)
2372
2373     now = time.time()
2374     end_time = now + timeout
2375     archived_count = 0
2376     last_touched = 0
2377
2378     all_job_ids = self._GetJobIDsUnlocked()
2379     pending = []
2380     for idx, job_id in enumerate(all_job_ids):
2381       last_touched = idx + 1
2382
2383       # Not optimal because jobs could be pending
2384       # TODO: Measure average duration for job archival and take number of
2385       # pending jobs into account.
2386       if time.time() > end_time:
2387         break
2388
2389       # Returns None if the job failed to load
2390       job = self._LoadJobUnlocked(job_id)
2391       if job:
2392         if job.end_timestamp is None:
2393           if job.start_timestamp is None:
2394             job_age = job.received_timestamp
2395           else:
2396             job_age = job.start_timestamp
2397         else:
2398           job_age = job.end_timestamp
2399
2400         if age == -1 or now - job_age[0] > age:
2401           pending.append(job)
2402
2403           # Archive 10 jobs at a time
2404           if len(pending) >= 10:
2405             archived_count += self._ArchiveJobsUnlocked(pending)
2406             pending = []
2407
2408     if pending:
2409       archived_count += self._ArchiveJobsUnlocked(pending)
2410
2411     return (archived_count, len(all_job_ids) - last_touched)
2412
2413   def _Query(self, fields, qfilter):
2414     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2415                        namefield="id")
2416
2417     job_ids = qobj.RequestedNames()
2418
2419     list_all = (job_ids is None)
2420
2421     if list_all:
2422       # Since files are added to/removed from the queue atomically, there's no
2423       # risk of getting the job ids in an inconsistent state.
2424       job_ids = self._GetJobIDsUnlocked()
2425
2426     jobs = []
2427
2428     for job_id in job_ids:
2429       job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2430       if job is not None or not list_all:
2431         jobs.append((job_id, job))
2432
2433     return (qobj, jobs, list_all)
2434
2435   def QueryJobs(self, fields, qfilter):
2436     """Returns a list of jobs in queue.
2437
2438     @type fields: sequence
2439     @param fields: List of wanted fields
2440     @type qfilter: None or query2 filter (list)
2441     @param qfilter: Query filter
2442
2443     """
2444     (qobj, ctx, _) = self._Query(fields, qfilter)
2445
2446     return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2447
2448   def OldStyleQueryJobs(self, job_ids, fields):
2449     """Returns a list of jobs in queue.
2450
2451     @type job_ids: list
2452     @param job_ids: sequence of job identifiers or None for all
2453     @type fields: list
2454     @param fields: names of fields to return
2455     @rtype: list
2456     @return: list one element per job, each element being list with
2457         the requested fields
2458
2459     """
2460     # backwards compat:
2461     job_ids = [int(jid) for jid in job_ids]
2462     qfilter = qlang.MakeSimpleFilter("id", job_ids)
2463
2464     (qobj, ctx, _) = self._Query(fields, qfilter)
2465
2466     return qobj.OldStyleQuery(ctx, sort_by_name=False)
2467
2468   @locking.ssynchronized(_LOCK)
2469   def PrepareShutdown(self):
2470     """Prepare to stop the job queue.
2471
2472     Disables execution of jobs in the workerpool and returns whether there are
2473     any jobs currently running. If the latter is the case, the job queue is not
2474     yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
2475     be called without interfering with any job. Queued and unfinished jobs will
2476     be resumed next time.
2477
2478     Once this function has been called no new job submissions will be accepted
2479     (see L{_RequireNonDrainedQueue}).
2480
2481     @rtype: bool
2482     @return: Whether there are any running jobs
2483
2484     """
2485     if self._accepting_jobs:
2486       self._accepting_jobs = False
2487
2488       # Tell worker pool to stop processing pending tasks
2489       self._wpool.SetActive(False)
2490
2491     return self._wpool.HasRunningTasks()
2492
2493   @locking.ssynchronized(_LOCK)
2494   @_RequireOpenQueue
2495   def Shutdown(self):
2496     """Stops the job queue.
2497
2498     This shutdowns all the worker threads an closes the queue.
2499
2500     """
2501     self._wpool.TerminateWorkers()
2502
2503     self._queue_filelock.Close()
2504     self._queue_filelock = None