jqueue: Add short delay before detecting job changes
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import logging
33 import errno
34 import re
35 import time
36 import weakref
37 import threading
38 import itertools
39
40 try:
41   # pylint: disable-msg=E0611
42   from pyinotify import pyinotify
43 except ImportError:
44   import pyinotify
45
46 from ganeti import asyncnotifier
47 from ganeti import constants
48 from ganeti import serializer
49 from ganeti import workerpool
50 from ganeti import locking
51 from ganeti import opcodes
52 from ganeti import errors
53 from ganeti import mcpu
54 from ganeti import utils
55 from ganeti import jstore
56 from ganeti import rpc
57 from ganeti import runtime
58 from ganeti import netutils
59 from ganeti import compat
60 from ganeti import ht
61
62
63 JOBQUEUE_THREADS = 25
64 JOBS_PER_ARCHIVE_DIRECTORY = 10000
65
66 # member lock names to be passed to @ssynchronized decorator
67 _LOCK = "_lock"
68 _QUEUE = "_queue"
69
70
71 class CancelJob(Exception):
72   """Special exception to cancel a job.
73
74   """
75
76
77 def TimeStampNow():
78   """Returns the current timestamp.
79
80   @rtype: tuple
81   @return: the current time in the (seconds, microseconds) format
82
83   """
84   return utils.SplitTime(time.time())
85
86
87 class _QueuedOpCode(object):
88   """Encapsulates an opcode object.
89
90   @ivar log: holds the execution log and consists of tuples
91   of the form C{(log_serial, timestamp, level, message)}
92   @ivar input: the OpCode we encapsulate
93   @ivar status: the current status
94   @ivar result: the result of the LU execution
95   @ivar start_timestamp: timestamp for the start of the execution
96   @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
97   @ivar stop_timestamp: timestamp for the end of the execution
98
99   """
100   __slots__ = ["input", "status", "result", "log", "priority",
101                "start_timestamp", "exec_timestamp", "end_timestamp",
102                "__weakref__"]
103
104   def __init__(self, op):
105     """Constructor for the _QuededOpCode.
106
107     @type op: L{opcodes.OpCode}
108     @param op: the opcode we encapsulate
109
110     """
111     self.input = op
112     self.status = constants.OP_STATUS_QUEUED
113     self.result = None
114     self.log = []
115     self.start_timestamp = None
116     self.exec_timestamp = None
117     self.end_timestamp = None
118
119     # Get initial priority (it might change during the lifetime of this opcode)
120     self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
121
122   @classmethod
123   def Restore(cls, state):
124     """Restore the _QueuedOpCode from the serialized form.
125
126     @type state: dict
127     @param state: the serialized state
128     @rtype: _QueuedOpCode
129     @return: a new _QueuedOpCode instance
130
131     """
132     obj = _QueuedOpCode.__new__(cls)
133     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
134     obj.status = state["status"]
135     obj.result = state["result"]
136     obj.log = state["log"]
137     obj.start_timestamp = state.get("start_timestamp", None)
138     obj.exec_timestamp = state.get("exec_timestamp", None)
139     obj.end_timestamp = state.get("end_timestamp", None)
140     obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
141     return obj
142
143   def Serialize(self):
144     """Serializes this _QueuedOpCode.
145
146     @rtype: dict
147     @return: the dictionary holding the serialized state
148
149     """
150     return {
151       "input": self.input.__getstate__(),
152       "status": self.status,
153       "result": self.result,
154       "log": self.log,
155       "start_timestamp": self.start_timestamp,
156       "exec_timestamp": self.exec_timestamp,
157       "end_timestamp": self.end_timestamp,
158       "priority": self.priority,
159       }
160
161
162 class _QueuedJob(object):
163   """In-memory job representation.
164
165   This is what we use to track the user-submitted jobs. Locking must
166   be taken care of by users of this class.
167
168   @type queue: L{JobQueue}
169   @ivar queue: the parent queue
170   @ivar id: the job ID
171   @type ops: list
172   @ivar ops: the list of _QueuedOpCode that constitute the job
173   @type log_serial: int
174   @ivar log_serial: holds the index for the next log entry
175   @ivar received_timestamp: the timestamp for when the job was received
176   @ivar start_timestmap: the timestamp for start of execution
177   @ivar end_timestamp: the timestamp for end of execution
178   @ivar writable: Whether the job is allowed to be modified
179
180   """
181   # pylint: disable-msg=W0212
182   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
183                "received_timestamp", "start_timestamp", "end_timestamp",
184                "__weakref__", "processor_lock", "writable"]
185
186   def __init__(self, queue, job_id, ops, writable):
187     """Constructor for the _QueuedJob.
188
189     @type queue: L{JobQueue}
190     @param queue: our parent queue
191     @type job_id: job_id
192     @param job_id: our job id
193     @type ops: list
194     @param ops: the list of opcodes we hold, which will be encapsulated
195         in _QueuedOpCodes
196     @type writable: bool
197     @param writable: Whether job can be modified
198
199     """
200     if not ops:
201       raise errors.GenericError("A job needs at least one opcode")
202
203     self.queue = queue
204     self.id = job_id
205     self.ops = [_QueuedOpCode(op) for op in ops]
206     self.log_serial = 0
207     self.received_timestamp = TimeStampNow()
208     self.start_timestamp = None
209     self.end_timestamp = None
210
211     self._InitInMemory(self, writable)
212
213   @staticmethod
214   def _InitInMemory(obj, writable):
215     """Initializes in-memory variables.
216
217     """
218     obj.writable = writable
219     obj.ops_iter = None
220     obj.cur_opctx = None
221
222     # Read-only jobs are not processed and therefore don't need a lock
223     if writable:
224       obj.processor_lock = threading.Lock()
225     else:
226       obj.processor_lock = None
227
228   def __repr__(self):
229     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
230               "id=%s" % self.id,
231               "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
232
233     return "<%s at %#x>" % (" ".join(status), id(self))
234
235   @classmethod
236   def Restore(cls, queue, state, writable):
237     """Restore a _QueuedJob from serialized state:
238
239     @type queue: L{JobQueue}
240     @param queue: to which queue the restored job belongs
241     @type state: dict
242     @param state: the serialized state
243     @type writable: bool
244     @param writable: Whether job can be modified
245     @rtype: _JobQueue
246     @return: the restored _JobQueue instance
247
248     """
249     obj = _QueuedJob.__new__(cls)
250     obj.queue = queue
251     obj.id = state["id"]
252     obj.received_timestamp = state.get("received_timestamp", None)
253     obj.start_timestamp = state.get("start_timestamp", None)
254     obj.end_timestamp = state.get("end_timestamp", None)
255
256     obj.ops = []
257     obj.log_serial = 0
258     for op_state in state["ops"]:
259       op = _QueuedOpCode.Restore(op_state)
260       for log_entry in op.log:
261         obj.log_serial = max(obj.log_serial, log_entry[0])
262       obj.ops.append(op)
263
264     cls._InitInMemory(obj, writable)
265
266     return obj
267
268   def Serialize(self):
269     """Serialize the _JobQueue instance.
270
271     @rtype: dict
272     @return: the serialized state
273
274     """
275     return {
276       "id": self.id,
277       "ops": [op.Serialize() for op in self.ops],
278       "start_timestamp": self.start_timestamp,
279       "end_timestamp": self.end_timestamp,
280       "received_timestamp": self.received_timestamp,
281       }
282
283   def CalcStatus(self):
284     """Compute the status of this job.
285
286     This function iterates over all the _QueuedOpCodes in the job and
287     based on their status, computes the job status.
288
289     The algorithm is:
290       - if we find a cancelled, or finished with error, the job
291         status will be the same
292       - otherwise, the last opcode with the status one of:
293           - waitlock
294           - canceling
295           - running
296
297         will determine the job status
298
299       - otherwise, it means either all opcodes are queued, or success,
300         and the job status will be the same
301
302     @return: the job status
303
304     """
305     status = constants.JOB_STATUS_QUEUED
306
307     all_success = True
308     for op in self.ops:
309       if op.status == constants.OP_STATUS_SUCCESS:
310         continue
311
312       all_success = False
313
314       if op.status == constants.OP_STATUS_QUEUED:
315         pass
316       elif op.status == constants.OP_STATUS_WAITING:
317         status = constants.JOB_STATUS_WAITING
318       elif op.status == constants.OP_STATUS_RUNNING:
319         status = constants.JOB_STATUS_RUNNING
320       elif op.status == constants.OP_STATUS_CANCELING:
321         status = constants.JOB_STATUS_CANCELING
322         break
323       elif op.status == constants.OP_STATUS_ERROR:
324         status = constants.JOB_STATUS_ERROR
325         # The whole job fails if one opcode failed
326         break
327       elif op.status == constants.OP_STATUS_CANCELED:
328         status = constants.OP_STATUS_CANCELED
329         break
330
331     if all_success:
332       status = constants.JOB_STATUS_SUCCESS
333
334     return status
335
336   def CalcPriority(self):
337     """Gets the current priority for this job.
338
339     Only unfinished opcodes are considered. When all are done, the default
340     priority is used.
341
342     @rtype: int
343
344     """
345     priorities = [op.priority for op in self.ops
346                   if op.status not in constants.OPS_FINALIZED]
347
348     if not priorities:
349       # All opcodes are done, assume default priority
350       return constants.OP_PRIO_DEFAULT
351
352     return min(priorities)
353
354   def GetLogEntries(self, newer_than):
355     """Selectively returns the log entries.
356
357     @type newer_than: None or int
358     @param newer_than: if this is None, return all log entries,
359         otherwise return only the log entries with serial higher
360         than this value
361     @rtype: list
362     @return: the list of the log entries selected
363
364     """
365     if newer_than is None:
366       serial = -1
367     else:
368       serial = newer_than
369
370     entries = []
371     for op in self.ops:
372       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
373
374     return entries
375
376   def GetInfo(self, fields):
377     """Returns information about a job.
378
379     @type fields: list
380     @param fields: names of fields to return
381     @rtype: list
382     @return: list with one element for each field
383     @raise errors.OpExecError: when an invalid field
384         has been passed
385
386     """
387     row = []
388     for fname in fields:
389       if fname == "id":
390         row.append(self.id)
391       elif fname == "status":
392         row.append(self.CalcStatus())
393       elif fname == "priority":
394         row.append(self.CalcPriority())
395       elif fname == "ops":
396         row.append([op.input.__getstate__() for op in self.ops])
397       elif fname == "opresult":
398         row.append([op.result for op in self.ops])
399       elif fname == "opstatus":
400         row.append([op.status for op in self.ops])
401       elif fname == "oplog":
402         row.append([op.log for op in self.ops])
403       elif fname == "opstart":
404         row.append([op.start_timestamp for op in self.ops])
405       elif fname == "opexec":
406         row.append([op.exec_timestamp for op in self.ops])
407       elif fname == "opend":
408         row.append([op.end_timestamp for op in self.ops])
409       elif fname == "oppriority":
410         row.append([op.priority for op in self.ops])
411       elif fname == "received_ts":
412         row.append(self.received_timestamp)
413       elif fname == "start_ts":
414         row.append(self.start_timestamp)
415       elif fname == "end_ts":
416         row.append(self.end_timestamp)
417       elif fname == "summary":
418         row.append([op.input.Summary() for op in self.ops])
419       else:
420         raise errors.OpExecError("Invalid self query field '%s'" % fname)
421     return row
422
423   def MarkUnfinishedOps(self, status, result):
424     """Mark unfinished opcodes with a given status and result.
425
426     This is an utility function for marking all running or waiting to
427     be run opcodes with a given status. Opcodes which are already
428     finalised are not changed.
429
430     @param status: a given opcode status
431     @param result: the opcode result
432
433     """
434     not_marked = True
435     for op in self.ops:
436       if op.status in constants.OPS_FINALIZED:
437         assert not_marked, "Finalized opcodes found after non-finalized ones"
438         continue
439       op.status = status
440       op.result = result
441       not_marked = False
442
443   def Finalize(self):
444     """Marks the job as finalized.
445
446     """
447     self.end_timestamp = TimeStampNow()
448
449   def Cancel(self):
450     """Marks job as canceled/-ing if possible.
451
452     @rtype: tuple; (bool, string)
453     @return: Boolean describing whether job was successfully canceled or marked
454       as canceling and a text message
455
456     """
457     status = self.CalcStatus()
458
459     if status == constants.JOB_STATUS_QUEUED:
460       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
461                              "Job canceled by request")
462       self.Finalize()
463       return (True, "Job %s canceled" % self.id)
464
465     elif status == constants.JOB_STATUS_WAITING:
466       # The worker will notice the new status and cancel the job
467       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
468       return (True, "Job %s will be canceled" % self.id)
469
470     else:
471       logging.debug("Job %s is no longer waiting in the queue", self.id)
472       return (False, "Job %s is no longer waiting in the queue" % self.id)
473
474
475 class _OpExecCallbacks(mcpu.OpExecCbBase):
476   def __init__(self, queue, job, op):
477     """Initializes this class.
478
479     @type queue: L{JobQueue}
480     @param queue: Job queue
481     @type job: L{_QueuedJob}
482     @param job: Job object
483     @type op: L{_QueuedOpCode}
484     @param op: OpCode
485
486     """
487     assert queue, "Queue is missing"
488     assert job, "Job is missing"
489     assert op, "Opcode is missing"
490
491     self._queue = queue
492     self._job = job
493     self._op = op
494
495   def _CheckCancel(self):
496     """Raises an exception to cancel the job if asked to.
497
498     """
499     # Cancel here if we were asked to
500     if self._op.status == constants.OP_STATUS_CANCELING:
501       logging.debug("Canceling opcode")
502       raise CancelJob()
503
504   @locking.ssynchronized(_QUEUE, shared=1)
505   def NotifyStart(self):
506     """Mark the opcode as running, not lock-waiting.
507
508     This is called from the mcpu code as a notifier function, when the LU is
509     finally about to start the Exec() method. Of course, to have end-user
510     visible results, the opcode must be initially (before calling into
511     Processor.ExecOpCode) set to OP_STATUS_WAITING.
512
513     """
514     assert self._op in self._job.ops
515     assert self._op.status in (constants.OP_STATUS_WAITING,
516                                constants.OP_STATUS_CANCELING)
517
518     # Cancel here if we were asked to
519     self._CheckCancel()
520
521     logging.debug("Opcode is now running")
522
523     self._op.status = constants.OP_STATUS_RUNNING
524     self._op.exec_timestamp = TimeStampNow()
525
526     # And finally replicate the job status
527     self._queue.UpdateJobUnlocked(self._job)
528
529   @locking.ssynchronized(_QUEUE, shared=1)
530   def _AppendFeedback(self, timestamp, log_type, log_msg):
531     """Internal feedback append function, with locks
532
533     """
534     self._job.log_serial += 1
535     self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
536     self._queue.UpdateJobUnlocked(self._job, replicate=False)
537
538   def Feedback(self, *args):
539     """Append a log entry.
540
541     """
542     assert len(args) < 3
543
544     if len(args) == 1:
545       log_type = constants.ELOG_MESSAGE
546       log_msg = args[0]
547     else:
548       (log_type, log_msg) = args
549
550     # The time is split to make serialization easier and not lose
551     # precision.
552     timestamp = utils.SplitTime(time.time())
553     self._AppendFeedback(timestamp, log_type, log_msg)
554
555   def CheckCancel(self):
556     """Check whether job has been cancelled.
557
558     """
559     assert self._op.status in (constants.OP_STATUS_WAITING,
560                                constants.OP_STATUS_CANCELING)
561
562     # Cancel here if we were asked to
563     self._CheckCancel()
564
565   def SubmitManyJobs(self, jobs):
566     """Submits jobs for processing.
567
568     See L{JobQueue.SubmitManyJobs}.
569
570     """
571     # Locking is done in job queue
572     return self._queue.SubmitManyJobs(jobs)
573
574
575 class _JobChangesChecker(object):
576   def __init__(self, fields, prev_job_info, prev_log_serial):
577     """Initializes this class.
578
579     @type fields: list of strings
580     @param fields: Fields requested by LUXI client
581     @type prev_job_info: string
582     @param prev_job_info: previous job info, as passed by the LUXI client
583     @type prev_log_serial: string
584     @param prev_log_serial: previous job serial, as passed by the LUXI client
585
586     """
587     self._fields = fields
588     self._prev_job_info = prev_job_info
589     self._prev_log_serial = prev_log_serial
590
591   def __call__(self, job):
592     """Checks whether job has changed.
593
594     @type job: L{_QueuedJob}
595     @param job: Job object
596
597     """
598     assert not job.writable, "Expected read-only job"
599
600     status = job.CalcStatus()
601     job_info = job.GetInfo(self._fields)
602     log_entries = job.GetLogEntries(self._prev_log_serial)
603
604     # Serializing and deserializing data can cause type changes (e.g. from
605     # tuple to list) or precision loss. We're doing it here so that we get
606     # the same modifications as the data received from the client. Without
607     # this, the comparison afterwards might fail without the data being
608     # significantly different.
609     # TODO: we just deserialized from disk, investigate how to make sure that
610     # the job info and log entries are compatible to avoid this further step.
611     # TODO: Doing something like in testutils.py:UnifyValueType might be more
612     # efficient, though floats will be tricky
613     job_info = serializer.LoadJson(serializer.DumpJson(job_info))
614     log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
615
616     # Don't even try to wait if the job is no longer running, there will be
617     # no changes.
618     if (status not in (constants.JOB_STATUS_QUEUED,
619                        constants.JOB_STATUS_RUNNING,
620                        constants.JOB_STATUS_WAITING) or
621         job_info != self._prev_job_info or
622         (log_entries and self._prev_log_serial != log_entries[0][0])):
623       logging.debug("Job %s changed", job.id)
624       return (job_info, log_entries)
625
626     return None
627
628
629 class _JobFileChangesWaiter(object):
630   def __init__(self, filename):
631     """Initializes this class.
632
633     @type filename: string
634     @param filename: Path to job file
635     @raises errors.InotifyError: if the notifier cannot be setup
636
637     """
638     self._wm = pyinotify.WatchManager()
639     self._inotify_handler = \
640       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
641     self._notifier = \
642       pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
643     try:
644       self._inotify_handler.enable()
645     except Exception:
646       # pyinotify doesn't close file descriptors automatically
647       self._notifier.stop()
648       raise
649
650   def _OnInotify(self, notifier_enabled):
651     """Callback for inotify.
652
653     """
654     if not notifier_enabled:
655       self._inotify_handler.enable()
656
657   def Wait(self, timeout):
658     """Waits for the job file to change.
659
660     @type timeout: float
661     @param timeout: Timeout in seconds
662     @return: Whether there have been events
663
664     """
665     assert timeout >= 0
666     have_events = self._notifier.check_events(timeout * 1000)
667     if have_events:
668       self._notifier.read_events()
669     self._notifier.process_events()
670     return have_events
671
672   def Close(self):
673     """Closes underlying notifier and its file descriptor.
674
675     """
676     self._notifier.stop()
677
678
679 class _JobChangesWaiter(object):
680   def __init__(self, filename):
681     """Initializes this class.
682
683     @type filename: string
684     @param filename: Path to job file
685
686     """
687     self._filewaiter = None
688     self._filename = filename
689
690   def Wait(self, timeout):
691     """Waits for a job to change.
692
693     @type timeout: float
694     @param timeout: Timeout in seconds
695     @return: Whether there have been events
696
697     """
698     if self._filewaiter:
699       return self._filewaiter.Wait(timeout)
700
701     # Lazy setup: Avoid inotify setup cost when job file has already changed.
702     # If this point is reached, return immediately and let caller check the job
703     # file again in case there were changes since the last check. This avoids a
704     # race condition.
705     self._filewaiter = _JobFileChangesWaiter(self._filename)
706
707     return True
708
709   def Close(self):
710     """Closes underlying waiter.
711
712     """
713     if self._filewaiter:
714       self._filewaiter.Close()
715
716
717 class _WaitForJobChangesHelper(object):
718   """Helper class using inotify to wait for changes in a job file.
719
720   This class takes a previous job status and serial, and alerts the client when
721   the current job status has changed.
722
723   """
724   @staticmethod
725   def _CheckForChanges(counter, job_load_fn, check_fn):
726     if counter.next() > 0:
727       # If this isn't the first check the job is given some more time to change
728       # again. This gives better performance for jobs generating many
729       # changes/messages.
730       time.sleep(0.1)
731
732     job = job_load_fn()
733     if not job:
734       raise errors.JobLost()
735
736     result = check_fn(job)
737     if result is None:
738       raise utils.RetryAgain()
739
740     return result
741
742   def __call__(self, filename, job_load_fn,
743                fields, prev_job_info, prev_log_serial, timeout):
744     """Waits for changes on a job.
745
746     @type filename: string
747     @param filename: File on which to wait for changes
748     @type job_load_fn: callable
749     @param job_load_fn: Function to load job
750     @type fields: list of strings
751     @param fields: Which fields to check for changes
752     @type prev_job_info: list or None
753     @param prev_job_info: Last job information returned
754     @type prev_log_serial: int
755     @param prev_log_serial: Last job message serial number
756     @type timeout: float
757     @param timeout: maximum time to wait in seconds
758
759     """
760     counter = itertools.count()
761     try:
762       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
763       waiter = _JobChangesWaiter(filename)
764       try:
765         return utils.Retry(compat.partial(self._CheckForChanges,
766                                           counter, job_load_fn, check_fn),
767                            utils.RETRY_REMAINING_TIME, timeout,
768                            wait_fn=waiter.Wait)
769       finally:
770         waiter.Close()
771     except (errors.InotifyError, errors.JobLost):
772       return None
773     except utils.RetryTimeout:
774       return constants.JOB_NOTCHANGED
775
776
777 def _EncodeOpError(err):
778   """Encodes an error which occurred while processing an opcode.
779
780   """
781   if isinstance(err, errors.GenericError):
782     to_encode = err
783   else:
784     to_encode = errors.OpExecError(str(err))
785
786   return errors.EncodeException(to_encode)
787
788
789 class _TimeoutStrategyWrapper:
790   def __init__(self, fn):
791     """Initializes this class.
792
793     """
794     self._fn = fn
795     self._next = None
796
797   def _Advance(self):
798     """Gets the next timeout if necessary.
799
800     """
801     if self._next is None:
802       self._next = self._fn()
803
804   def Peek(self):
805     """Returns the next timeout.
806
807     """
808     self._Advance()
809     return self._next
810
811   def Next(self):
812     """Returns the current timeout and advances the internal state.
813
814     """
815     self._Advance()
816     result = self._next
817     self._next = None
818     return result
819
820
821 class _OpExecContext:
822   def __init__(self, op, index, log_prefix, timeout_strategy_factory):
823     """Initializes this class.
824
825     """
826     self.op = op
827     self.index = index
828     self.log_prefix = log_prefix
829     self.summary = op.input.Summary()
830
831     # Create local copy to modify
832     if getattr(op.input, opcodes.DEPEND_ATTR, None):
833       self.jobdeps = op.input.depends[:]
834     else:
835       self.jobdeps = None
836
837     self._timeout_strategy_factory = timeout_strategy_factory
838     self._ResetTimeoutStrategy()
839
840   def _ResetTimeoutStrategy(self):
841     """Creates a new timeout strategy.
842
843     """
844     self._timeout_strategy = \
845       _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
846
847   def CheckPriorityIncrease(self):
848     """Checks whether priority can and should be increased.
849
850     Called when locks couldn't be acquired.
851
852     """
853     op = self.op
854
855     # Exhausted all retries and next round should not use blocking acquire
856     # for locks?
857     if (self._timeout_strategy.Peek() is None and
858         op.priority > constants.OP_PRIO_HIGHEST):
859       logging.debug("Increasing priority")
860       op.priority -= 1
861       self._ResetTimeoutStrategy()
862       return True
863
864     return False
865
866   def GetNextLockTimeout(self):
867     """Returns the next lock acquire timeout.
868
869     """
870     return self._timeout_strategy.Next()
871
872
873 class _JobProcessor(object):
874   (DEFER,
875    WAITDEP,
876    FINISHED) = range(1, 4)
877
878   def __init__(self, queue, opexec_fn, job,
879                _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
880     """Initializes this class.
881
882     """
883     self.queue = queue
884     self.opexec_fn = opexec_fn
885     self.job = job
886     self._timeout_strategy_factory = _timeout_strategy_factory
887
888   @staticmethod
889   def _FindNextOpcode(job, timeout_strategy_factory):
890     """Locates the next opcode to run.
891
892     @type job: L{_QueuedJob}
893     @param job: Job object
894     @param timeout_strategy_factory: Callable to create new timeout strategy
895
896     """
897     # Create some sort of a cache to speed up locating next opcode for future
898     # lookups
899     # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
900     # pending and one for processed ops.
901     if job.ops_iter is None:
902       job.ops_iter = enumerate(job.ops)
903
904     # Find next opcode to run
905     while True:
906       try:
907         (idx, op) = job.ops_iter.next()
908       except StopIteration:
909         raise errors.ProgrammerError("Called for a finished job")
910
911       if op.status == constants.OP_STATUS_RUNNING:
912         # Found an opcode already marked as running
913         raise errors.ProgrammerError("Called for job marked as running")
914
915       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
916                              timeout_strategy_factory)
917
918       if op.status not in constants.OPS_FINALIZED:
919         return opctx
920
921       # This is a job that was partially completed before master daemon
922       # shutdown, so it can be expected that some opcodes are already
923       # completed successfully (if any did error out, then the whole job
924       # should have been aborted and not resubmitted for processing).
925       logging.info("%s: opcode %s already processed, skipping",
926                    opctx.log_prefix, opctx.summary)
927
928   @staticmethod
929   def _MarkWaitlock(job, op):
930     """Marks an opcode as waiting for locks.
931
932     The job's start timestamp is also set if necessary.
933
934     @type job: L{_QueuedJob}
935     @param job: Job object
936     @type op: L{_QueuedOpCode}
937     @param op: Opcode object
938
939     """
940     assert op in job.ops
941     assert op.status in (constants.OP_STATUS_QUEUED,
942                          constants.OP_STATUS_WAITING)
943
944     update = False
945
946     op.result = None
947
948     if op.status == constants.OP_STATUS_QUEUED:
949       op.status = constants.OP_STATUS_WAITING
950       update = True
951
952     if op.start_timestamp is None:
953       op.start_timestamp = TimeStampNow()
954       update = True
955
956     if job.start_timestamp is None:
957       job.start_timestamp = op.start_timestamp
958       update = True
959
960     assert op.status == constants.OP_STATUS_WAITING
961
962     return update
963
964   @staticmethod
965   def _CheckDependencies(queue, job, opctx):
966     """Checks if an opcode has dependencies and if so, processes them.
967
968     @type queue: L{JobQueue}
969     @param queue: Queue object
970     @type job: L{_QueuedJob}
971     @param job: Job object
972     @type opctx: L{_OpExecContext}
973     @param opctx: Opcode execution context
974     @rtype: bool
975     @return: Whether opcode will be re-scheduled by dependency tracker
976
977     """
978     op = opctx.op
979
980     result = False
981
982     while opctx.jobdeps:
983       (dep_job_id, dep_status) = opctx.jobdeps[0]
984
985       (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
986                                                           dep_status)
987       assert ht.TNonEmptyString(depmsg), "No dependency message"
988
989       logging.info("%s: %s", opctx.log_prefix, depmsg)
990
991       if depresult == _JobDependencyManager.CONTINUE:
992         # Remove dependency and continue
993         opctx.jobdeps.pop(0)
994
995       elif depresult == _JobDependencyManager.WAIT:
996         # Need to wait for notification, dependency tracker will re-add job
997         # to workerpool
998         result = True
999         break
1000
1001       elif depresult == _JobDependencyManager.CANCEL:
1002         # Job was cancelled, cancel this job as well
1003         job.Cancel()
1004         assert op.status == constants.OP_STATUS_CANCELING
1005         break
1006
1007       elif depresult in (_JobDependencyManager.WRONGSTATUS,
1008                          _JobDependencyManager.ERROR):
1009         # Job failed or there was an error, this job must fail
1010         op.status = constants.OP_STATUS_ERROR
1011         op.result = _EncodeOpError(errors.OpExecError(depmsg))
1012         break
1013
1014       else:
1015         raise errors.ProgrammerError("Unknown dependency result '%s'" %
1016                                      depresult)
1017
1018     return result
1019
1020   def _ExecOpCodeUnlocked(self, opctx):
1021     """Processes one opcode and returns the result.
1022
1023     """
1024     op = opctx.op
1025
1026     assert op.status == constants.OP_STATUS_WAITING
1027
1028     timeout = opctx.GetNextLockTimeout()
1029
1030     try:
1031       # Make sure not to hold queue lock while calling ExecOpCode
1032       result = self.opexec_fn(op.input,
1033                               _OpExecCallbacks(self.queue, self.job, op),
1034                               timeout=timeout, priority=op.priority)
1035     except mcpu.LockAcquireTimeout:
1036       assert timeout is not None, "Received timeout for blocking acquire"
1037       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
1038
1039       assert op.status in (constants.OP_STATUS_WAITING,
1040                            constants.OP_STATUS_CANCELING)
1041
1042       # Was job cancelled while we were waiting for the lock?
1043       if op.status == constants.OP_STATUS_CANCELING:
1044         return (constants.OP_STATUS_CANCELING, None)
1045
1046       # Stay in waitlock while trying to re-acquire lock
1047       return (constants.OP_STATUS_WAITING, None)
1048     except CancelJob:
1049       logging.exception("%s: Canceling job", opctx.log_prefix)
1050       assert op.status == constants.OP_STATUS_CANCELING
1051       return (constants.OP_STATUS_CANCELING, None)
1052     except Exception, err: # pylint: disable-msg=W0703
1053       logging.exception("%s: Caught exception in %s",
1054                         opctx.log_prefix, opctx.summary)
1055       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
1056     else:
1057       logging.debug("%s: %s successful",
1058                     opctx.log_prefix, opctx.summary)
1059       return (constants.OP_STATUS_SUCCESS, result)
1060
1061   def __call__(self, _nextop_fn=None):
1062     """Continues execution of a job.
1063
1064     @param _nextop_fn: Callback function for tests
1065     @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
1066       be deferred and C{WAITDEP} if the dependency manager
1067       (L{_JobDependencyManager}) will re-schedule the job when appropriate
1068
1069     """
1070     queue = self.queue
1071     job = self.job
1072
1073     logging.debug("Processing job %s", job.id)
1074
1075     queue.acquire(shared=1)
1076     try:
1077       opcount = len(job.ops)
1078
1079       assert job.writable, "Expected writable job"
1080
1081       # Don't do anything for finalized jobs
1082       if job.CalcStatus() in constants.JOBS_FINALIZED:
1083         return self.FINISHED
1084
1085       # Is a previous opcode still pending?
1086       if job.cur_opctx:
1087         opctx = job.cur_opctx
1088         job.cur_opctx = None
1089       else:
1090         if __debug__ and _nextop_fn:
1091           _nextop_fn()
1092         opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
1093
1094       op = opctx.op
1095
1096       # Consistency check
1097       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
1098                                      constants.OP_STATUS_CANCELING)
1099                         for i in job.ops[opctx.index + 1:])
1100
1101       assert op.status in (constants.OP_STATUS_QUEUED,
1102                            constants.OP_STATUS_WAITING,
1103                            constants.OP_STATUS_CANCELING)
1104
1105       assert (op.priority <= constants.OP_PRIO_LOWEST and
1106               op.priority >= constants.OP_PRIO_HIGHEST)
1107
1108       waitjob = None
1109
1110       if op.status != constants.OP_STATUS_CANCELING:
1111         assert op.status in (constants.OP_STATUS_QUEUED,
1112                              constants.OP_STATUS_WAITING)
1113
1114         # Prepare to start opcode
1115         if self._MarkWaitlock(job, op):
1116           # Write to disk
1117           queue.UpdateJobUnlocked(job)
1118
1119         assert op.status == constants.OP_STATUS_WAITING
1120         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1121         assert job.start_timestamp and op.start_timestamp
1122         assert waitjob is None
1123
1124         # Check if waiting for a job is necessary
1125         waitjob = self._CheckDependencies(queue, job, opctx)
1126
1127         assert op.status in (constants.OP_STATUS_WAITING,
1128                              constants.OP_STATUS_CANCELING,
1129                              constants.OP_STATUS_ERROR)
1130
1131         if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1132                                          constants.OP_STATUS_ERROR)):
1133           logging.info("%s: opcode %s waiting for locks",
1134                        opctx.log_prefix, opctx.summary)
1135
1136           assert not opctx.jobdeps, "Not all dependencies were removed"
1137
1138           queue.release()
1139           try:
1140             (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1141           finally:
1142             queue.acquire(shared=1)
1143
1144           op.status = op_status
1145           op.result = op_result
1146
1147           assert not waitjob
1148
1149         if op.status == constants.OP_STATUS_WAITING:
1150           # Couldn't get locks in time
1151           assert not op.end_timestamp
1152         else:
1153           # Finalize opcode
1154           op.end_timestamp = TimeStampNow()
1155
1156           if op.status == constants.OP_STATUS_CANCELING:
1157             assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1158                                   for i in job.ops[opctx.index:])
1159           else:
1160             assert op.status in constants.OPS_FINALIZED
1161
1162       if op.status == constants.OP_STATUS_WAITING or waitjob:
1163         finalize = False
1164
1165         if not waitjob and opctx.CheckPriorityIncrease():
1166           # Priority was changed, need to update on-disk file
1167           queue.UpdateJobUnlocked(job)
1168
1169         # Keep around for another round
1170         job.cur_opctx = opctx
1171
1172         assert (op.priority <= constants.OP_PRIO_LOWEST and
1173                 op.priority >= constants.OP_PRIO_HIGHEST)
1174
1175         # In no case must the status be finalized here
1176         assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1177
1178       else:
1179         # Ensure all opcodes so far have been successful
1180         assert (opctx.index == 0 or
1181                 compat.all(i.status == constants.OP_STATUS_SUCCESS
1182                            for i in job.ops[:opctx.index]))
1183
1184         # Reset context
1185         job.cur_opctx = None
1186
1187         if op.status == constants.OP_STATUS_SUCCESS:
1188           finalize = False
1189
1190         elif op.status == constants.OP_STATUS_ERROR:
1191           # Ensure failed opcode has an exception as its result
1192           assert errors.GetEncodedError(job.ops[opctx.index].result)
1193
1194           to_encode = errors.OpExecError("Preceding opcode failed")
1195           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1196                                 _EncodeOpError(to_encode))
1197           finalize = True
1198
1199           # Consistency check
1200           assert compat.all(i.status == constants.OP_STATUS_ERROR and
1201                             errors.GetEncodedError(i.result)
1202                             for i in job.ops[opctx.index:])
1203
1204         elif op.status == constants.OP_STATUS_CANCELING:
1205           job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1206                                 "Job canceled by request")
1207           finalize = True
1208
1209         else:
1210           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1211
1212         if opctx.index == (opcount - 1):
1213           # Finalize on last opcode
1214           finalize = True
1215
1216         if finalize:
1217           # All opcodes have been run, finalize job
1218           job.Finalize()
1219
1220         # Write to disk. If the job status is final, this is the final write
1221         # allowed. Once the file has been written, it can be archived anytime.
1222         queue.UpdateJobUnlocked(job)
1223
1224         assert not waitjob
1225
1226         if finalize:
1227           logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1228           return self.FINISHED
1229
1230       assert not waitjob or queue.depmgr.JobWaiting(job)
1231
1232       if waitjob:
1233         return self.WAITDEP
1234       else:
1235         return self.DEFER
1236     finally:
1237       assert job.writable, "Job became read-only while being processed"
1238       queue.release()
1239
1240
1241 class _JobQueueWorker(workerpool.BaseWorker):
1242   """The actual job workers.
1243
1244   """
1245   def RunTask(self, job): # pylint: disable-msg=W0221
1246     """Job executor.
1247
1248     @type job: L{_QueuedJob}
1249     @param job: the job to be processed
1250
1251     """
1252     assert job.writable, "Expected writable job"
1253
1254     # Ensure only one worker is active on a single job. If a job registers for
1255     # a dependency job, and the other job notifies before the first worker is
1256     # done, the job can end up in the tasklist more than once.
1257     job.processor_lock.acquire()
1258     try:
1259       return self._RunTaskInner(job)
1260     finally:
1261       job.processor_lock.release()
1262
1263   def _RunTaskInner(self, job):
1264     """Executes a job.
1265
1266     Must be called with per-job lock acquired.
1267
1268     """
1269     queue = job.queue
1270     assert queue == self.pool.queue
1271
1272     setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1273     setname_fn(None)
1274
1275     proc = mcpu.Processor(queue.context, job.id)
1276
1277     # Create wrapper for setting thread name
1278     wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1279                                     proc.ExecOpCode)
1280
1281     result = _JobProcessor(queue, wrap_execop_fn, job)()
1282
1283     if result == _JobProcessor.FINISHED:
1284       # Notify waiting jobs
1285       queue.depmgr.NotifyWaiters(job.id)
1286
1287     elif result == _JobProcessor.DEFER:
1288       # Schedule again
1289       raise workerpool.DeferTask(priority=job.CalcPriority())
1290
1291     elif result == _JobProcessor.WAITDEP:
1292       # No-op, dependency manager will re-schedule
1293       pass
1294
1295     else:
1296       raise errors.ProgrammerError("Job processor returned unknown status %s" %
1297                                    (result, ))
1298
1299   @staticmethod
1300   def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1301     """Updates the worker thread name to include a short summary of the opcode.
1302
1303     @param setname_fn: Callable setting worker thread name
1304     @param execop_fn: Callable for executing opcode (usually
1305                       L{mcpu.Processor.ExecOpCode})
1306
1307     """
1308     setname_fn(op)
1309     try:
1310       return execop_fn(op, *args, **kwargs)
1311     finally:
1312       setname_fn(None)
1313
1314   @staticmethod
1315   def _GetWorkerName(job, op):
1316     """Sets the worker thread name.
1317
1318     @type job: L{_QueuedJob}
1319     @type op: L{opcodes.OpCode}
1320
1321     """
1322     parts = ["Job%s" % job.id]
1323
1324     if op:
1325       parts.append(op.TinySummary())
1326
1327     return "/".join(parts)
1328
1329
1330 class _JobQueueWorkerPool(workerpool.WorkerPool):
1331   """Simple class implementing a job-processing workerpool.
1332
1333   """
1334   def __init__(self, queue):
1335     super(_JobQueueWorkerPool, self).__init__("Jq",
1336                                               JOBQUEUE_THREADS,
1337                                               _JobQueueWorker)
1338     self.queue = queue
1339
1340
1341 class _JobDependencyManager:
1342   """Keeps track of job dependencies.
1343
1344   """
1345   (WAIT,
1346    ERROR,
1347    CANCEL,
1348    CONTINUE,
1349    WRONGSTATUS) = range(1, 6)
1350
1351   def __init__(self, getstatus_fn, enqueue_fn):
1352     """Initializes this class.
1353
1354     """
1355     self._getstatus_fn = getstatus_fn
1356     self._enqueue_fn = enqueue_fn
1357
1358     self._waiters = {}
1359     self._lock = locking.SharedLock("JobDepMgr")
1360
1361   @locking.ssynchronized(_LOCK, shared=1)
1362   def GetLockInfo(self, requested): # pylint: disable-msg=W0613
1363     """Retrieves information about waiting jobs.
1364
1365     @type requested: set
1366     @param requested: Requested information, see C{query.LQ_*}
1367
1368     """
1369     # No need to sort here, that's being done by the lock manager and query
1370     # library. There are no priorities for notifying jobs, hence all show up as
1371     # one item under "pending".
1372     return [("job/%s" % job_id, None, None,
1373              [("job", [job.id for job in waiters])])
1374             for job_id, waiters in self._waiters.items()
1375             if waiters]
1376
1377   @locking.ssynchronized(_LOCK, shared=1)
1378   def JobWaiting(self, job):
1379     """Checks if a job is waiting.
1380
1381     """
1382     return compat.any(job in jobs
1383                       for jobs in self._waiters.values())
1384
1385   @locking.ssynchronized(_LOCK)
1386   def CheckAndRegister(self, job, dep_job_id, dep_status):
1387     """Checks if a dependency job has the requested status.
1388
1389     If the other job is not yet in a finalized status, the calling job will be
1390     notified (re-added to the workerpool) at a later point.
1391
1392     @type job: L{_QueuedJob}
1393     @param job: Job object
1394     @type dep_job_id: string
1395     @param dep_job_id: ID of dependency job
1396     @type dep_status: list
1397     @param dep_status: Required status
1398
1399     """
1400     assert ht.TString(job.id)
1401     assert ht.TString(dep_job_id)
1402     assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1403
1404     if job.id == dep_job_id:
1405       return (self.ERROR, "Job can't depend on itself")
1406
1407     # Get status of dependency job
1408     try:
1409       status = self._getstatus_fn(dep_job_id)
1410     except errors.JobLost, err:
1411       return (self.ERROR, "Dependency error: %s" % err)
1412
1413     assert status in constants.JOB_STATUS_ALL
1414
1415     job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1416
1417     if status not in constants.JOBS_FINALIZED:
1418       # Register for notification and wait for job to finish
1419       job_id_waiters.add(job)
1420       return (self.WAIT,
1421               "Need to wait for job %s, wanted status '%s'" %
1422               (dep_job_id, dep_status))
1423
1424     # Remove from waiters list
1425     if job in job_id_waiters:
1426       job_id_waiters.remove(job)
1427
1428     if (status == constants.JOB_STATUS_CANCELED and
1429         constants.JOB_STATUS_CANCELED not in dep_status):
1430       return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1431
1432     elif not dep_status or status in dep_status:
1433       return (self.CONTINUE,
1434               "Dependency job %s finished with status '%s'" %
1435               (dep_job_id, status))
1436
1437     else:
1438       return (self.WRONGSTATUS,
1439               "Dependency job %s finished with status '%s',"
1440               " not one of '%s' as required" %
1441               (dep_job_id, status, utils.CommaJoin(dep_status)))
1442
1443   @locking.ssynchronized(_LOCK)
1444   def NotifyWaiters(self, job_id):
1445     """Notifies all jobs waiting for a certain job ID.
1446
1447     @type job_id: string
1448     @param job_id: Job ID
1449
1450     """
1451     assert ht.TString(job_id)
1452
1453     jobs = self._waiters.pop(job_id, None)
1454     if jobs:
1455       # Re-add jobs to workerpool
1456       logging.debug("Re-adding %s jobs which were waiting for job %s",
1457                     len(jobs), job_id)
1458       self._enqueue_fn(jobs)
1459
1460     # Remove all jobs without actual waiters
1461     for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1462                    if not waiters]:
1463       del self._waiters[job_id]
1464
1465
1466 def _RequireOpenQueue(fn):
1467   """Decorator for "public" functions.
1468
1469   This function should be used for all 'public' functions. That is,
1470   functions usually called from other classes. Note that this should
1471   be applied only to methods (not plain functions), since it expects
1472   that the decorated function is called with a first argument that has
1473   a '_queue_filelock' argument.
1474
1475   @warning: Use this decorator only after locking.ssynchronized
1476
1477   Example::
1478     @locking.ssynchronized(_LOCK)
1479     @_RequireOpenQueue
1480     def Example(self):
1481       pass
1482
1483   """
1484   def wrapper(self, *args, **kwargs):
1485     # pylint: disable-msg=W0212
1486     assert self._queue_filelock is not None, "Queue should be open"
1487     return fn(self, *args, **kwargs)
1488   return wrapper
1489
1490
1491 class JobQueue(object):
1492   """Queue used to manage the jobs.
1493
1494   @cvar _RE_JOB_FILE: regex matching the valid job file names
1495
1496   """
1497   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1498
1499   def __init__(self, context):
1500     """Constructor for JobQueue.
1501
1502     The constructor will initialize the job queue object and then
1503     start loading the current jobs from disk, either for starting them
1504     (if they were queue) or for aborting them (if they were already
1505     running).
1506
1507     @type context: GanetiContext
1508     @param context: the context object for access to the configuration
1509         data and other ganeti objects
1510
1511     """
1512     self.context = context
1513     self._memcache = weakref.WeakValueDictionary()
1514     self._my_hostname = netutils.Hostname.GetSysName()
1515
1516     # The Big JobQueue lock. If a code block or method acquires it in shared
1517     # mode safe it must guarantee concurrency with all the code acquiring it in
1518     # shared mode, including itself. In order not to acquire it at all
1519     # concurrency must be guaranteed with all code acquiring it in shared mode
1520     # and all code acquiring it exclusively.
1521     self._lock = locking.SharedLock("JobQueue")
1522
1523     self.acquire = self._lock.acquire
1524     self.release = self._lock.release
1525
1526     # Initialize the queue, and acquire the filelock.
1527     # This ensures no other process is working on the job queue.
1528     self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1529
1530     # Read serial file
1531     self._last_serial = jstore.ReadSerial()
1532     assert self._last_serial is not None, ("Serial file was modified between"
1533                                            " check in jstore and here")
1534
1535     # Get initial list of nodes
1536     self._nodes = dict((n.name, n.primary_ip)
1537                        for n in self.context.cfg.GetAllNodesInfo().values()
1538                        if n.master_candidate)
1539
1540     # Remove master node
1541     self._nodes.pop(self._my_hostname, None)
1542
1543     # TODO: Check consistency across nodes
1544
1545     self._queue_size = 0
1546     self._UpdateQueueSizeUnlocked()
1547     self._drained = jstore.CheckDrainFlag()
1548
1549     # Job dependencies
1550     self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1551                                         self._EnqueueJobs)
1552     self.context.glm.AddToLockMonitor(self.depmgr)
1553
1554     # Setup worker pool
1555     self._wpool = _JobQueueWorkerPool(self)
1556     try:
1557       self._InspectQueue()
1558     except:
1559       self._wpool.TerminateWorkers()
1560       raise
1561
1562   @locking.ssynchronized(_LOCK)
1563   @_RequireOpenQueue
1564   def _InspectQueue(self):
1565     """Loads the whole job queue and resumes unfinished jobs.
1566
1567     This function needs the lock here because WorkerPool.AddTask() may start a
1568     job while we're still doing our work.
1569
1570     """
1571     logging.info("Inspecting job queue")
1572
1573     restartjobs = []
1574
1575     all_job_ids = self._GetJobIDsUnlocked()
1576     jobs_count = len(all_job_ids)
1577     lastinfo = time.time()
1578     for idx, job_id in enumerate(all_job_ids):
1579       # Give an update every 1000 jobs or 10 seconds
1580       if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
1581           idx == (jobs_count - 1)):
1582         logging.info("Job queue inspection: %d/%d (%0.1f %%)",
1583                      idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1584         lastinfo = time.time()
1585
1586       job = self._LoadJobUnlocked(job_id)
1587
1588       # a failure in loading the job can cause 'None' to be returned
1589       if job is None:
1590         continue
1591
1592       status = job.CalcStatus()
1593
1594       if status == constants.JOB_STATUS_QUEUED:
1595         restartjobs.append(job)
1596
1597       elif status in (constants.JOB_STATUS_RUNNING,
1598                       constants.JOB_STATUS_WAITING,
1599                       constants.JOB_STATUS_CANCELING):
1600         logging.warning("Unfinished job %s found: %s", job.id, job)
1601
1602         if status == constants.JOB_STATUS_WAITING:
1603           # Restart job
1604           job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1605           restartjobs.append(job)
1606         else:
1607           job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1608                                 "Unclean master daemon shutdown")
1609           job.Finalize()
1610
1611         self.UpdateJobUnlocked(job)
1612
1613     if restartjobs:
1614       logging.info("Restarting %s jobs", len(restartjobs))
1615       self._EnqueueJobsUnlocked(restartjobs)
1616
1617     logging.info("Job queue inspection finished")
1618
1619   @locking.ssynchronized(_LOCK)
1620   @_RequireOpenQueue
1621   def AddNode(self, node):
1622     """Register a new node with the queue.
1623
1624     @type node: L{objects.Node}
1625     @param node: the node object to be added
1626
1627     """
1628     node_name = node.name
1629     assert node_name != self._my_hostname
1630
1631     # Clean queue directory on added node
1632     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1633     msg = result.fail_msg
1634     if msg:
1635       logging.warning("Cannot cleanup queue directory on node %s: %s",
1636                       node_name, msg)
1637
1638     if not node.master_candidate:
1639       # remove if existing, ignoring errors
1640       self._nodes.pop(node_name, None)
1641       # and skip the replication of the job ids
1642       return
1643
1644     # Upload the whole queue excluding archived jobs
1645     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1646
1647     # Upload current serial file
1648     files.append(constants.JOB_QUEUE_SERIAL_FILE)
1649
1650     for file_name in files:
1651       # Read file content
1652       content = utils.ReadFile(file_name)
1653
1654       result = rpc.RpcRunner.call_jobqueue_update([node_name],
1655                                                   [node.primary_ip],
1656                                                   file_name, content)
1657       msg = result[node_name].fail_msg
1658       if msg:
1659         logging.error("Failed to upload file %s to node %s: %s",
1660                       file_name, node_name, msg)
1661
1662     self._nodes[node_name] = node.primary_ip
1663
1664   @locking.ssynchronized(_LOCK)
1665   @_RequireOpenQueue
1666   def RemoveNode(self, node_name):
1667     """Callback called when removing nodes from the cluster.
1668
1669     @type node_name: str
1670     @param node_name: the name of the node to remove
1671
1672     """
1673     self._nodes.pop(node_name, None)
1674
1675   @staticmethod
1676   def _CheckRpcResult(result, nodes, failmsg):
1677     """Verifies the status of an RPC call.
1678
1679     Since we aim to keep consistency should this node (the current
1680     master) fail, we will log errors if our rpc fail, and especially
1681     log the case when more than half of the nodes fails.
1682
1683     @param result: the data as returned from the rpc call
1684     @type nodes: list
1685     @param nodes: the list of nodes we made the call to
1686     @type failmsg: str
1687     @param failmsg: the identifier to be used for logging
1688
1689     """
1690     failed = []
1691     success = []
1692
1693     for node in nodes:
1694       msg = result[node].fail_msg
1695       if msg:
1696         failed.append(node)
1697         logging.error("RPC call %s (%s) failed on node %s: %s",
1698                       result[node].call, failmsg, node, msg)
1699       else:
1700         success.append(node)
1701
1702     # +1 for the master node
1703     if (len(success) + 1) < len(failed):
1704       # TODO: Handle failing nodes
1705       logging.error("More than half of the nodes failed")
1706
1707   def _GetNodeIp(self):
1708     """Helper for returning the node name/ip list.
1709
1710     @rtype: (list, list)
1711     @return: a tuple of two lists, the first one with the node
1712         names and the second one with the node addresses
1713
1714     """
1715     # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1716     name_list = self._nodes.keys()
1717     addr_list = [self._nodes[name] for name in name_list]
1718     return name_list, addr_list
1719
1720   def _UpdateJobQueueFile(self, file_name, data, replicate):
1721     """Writes a file locally and then replicates it to all nodes.
1722
1723     This function will replace the contents of a file on the local
1724     node and then replicate it to all the other nodes we have.
1725
1726     @type file_name: str
1727     @param file_name: the path of the file to be replicated
1728     @type data: str
1729     @param data: the new contents of the file
1730     @type replicate: boolean
1731     @param replicate: whether to spread the changes to the remote nodes
1732
1733     """
1734     getents = runtime.GetEnts()
1735     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1736                     gid=getents.masterd_gid)
1737
1738     if replicate:
1739       names, addrs = self._GetNodeIp()
1740       result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1741       self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1742
1743   def _RenameFilesUnlocked(self, rename):
1744     """Renames a file locally and then replicate the change.
1745
1746     This function will rename a file in the local queue directory
1747     and then replicate this rename to all the other nodes we have.
1748
1749     @type rename: list of (old, new)
1750     @param rename: List containing tuples mapping old to new names
1751
1752     """
1753     # Rename them locally
1754     for old, new in rename:
1755       utils.RenameFile(old, new, mkdir=True)
1756
1757     # ... and on all nodes
1758     names, addrs = self._GetNodeIp()
1759     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1760     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1761
1762   @staticmethod
1763   def _FormatJobID(job_id):
1764     """Convert a job ID to string format.
1765
1766     Currently this just does C{str(job_id)} after performing some
1767     checks, but if we want to change the job id format this will
1768     abstract this change.
1769
1770     @type job_id: int or long
1771     @param job_id: the numeric job id
1772     @rtype: str
1773     @return: the formatted job id
1774
1775     """
1776     if not isinstance(job_id, (int, long)):
1777       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
1778     if job_id < 0:
1779       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
1780
1781     return str(job_id)
1782
1783   @classmethod
1784   def _GetArchiveDirectory(cls, job_id):
1785     """Returns the archive directory for a job.
1786
1787     @type job_id: str
1788     @param job_id: Job identifier
1789     @rtype: str
1790     @return: Directory name
1791
1792     """
1793     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
1794
1795   def _NewSerialsUnlocked(self, count):
1796     """Generates a new job identifier.
1797
1798     Job identifiers are unique during the lifetime of a cluster.
1799
1800     @type count: integer
1801     @param count: how many serials to return
1802     @rtype: str
1803     @return: a string representing the job identifier.
1804
1805     """
1806     assert count > 0
1807     # New number
1808     serial = self._last_serial + count
1809
1810     # Write to file
1811     self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
1812                              "%s\n" % serial, True)
1813
1814     result = [self._FormatJobID(v)
1815               for v in range(self._last_serial + 1, serial + 1)]
1816
1817     # Keep it only if we were able to write the file
1818     self._last_serial = serial
1819
1820     assert len(result) == count
1821
1822     return result
1823
1824   @staticmethod
1825   def _GetJobPath(job_id):
1826     """Returns the job file for a given job id.
1827
1828     @type job_id: str
1829     @param job_id: the job identifier
1830     @rtype: str
1831     @return: the path to the job file
1832
1833     """
1834     return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1835
1836   @classmethod
1837   def _GetArchivedJobPath(cls, job_id):
1838     """Returns the archived job file for a give job id.
1839
1840     @type job_id: str
1841     @param job_id: the job identifier
1842     @rtype: str
1843     @return: the path to the archived job file
1844
1845     """
1846     return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
1847                           cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
1848
1849   def _GetJobIDsUnlocked(self, sort=True):
1850     """Return all known job IDs.
1851
1852     The method only looks at disk because it's a requirement that all
1853     jobs are present on disk (so in the _memcache we don't have any
1854     extra IDs).
1855
1856     @type sort: boolean
1857     @param sort: perform sorting on the returned job ids
1858     @rtype: list
1859     @return: the list of job IDs
1860
1861     """
1862     jlist = []
1863     for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
1864       m = self._RE_JOB_FILE.match(filename)
1865       if m:
1866         jlist.append(m.group(1))
1867     if sort:
1868       jlist = utils.NiceSort(jlist)
1869     return jlist
1870
1871   def _LoadJobUnlocked(self, job_id):
1872     """Loads a job from the disk or memory.
1873
1874     Given a job id, this will return the cached job object if
1875     existing, or try to load the job from the disk. If loading from
1876     disk, it will also add the job to the cache.
1877
1878     @param job_id: the job id
1879     @rtype: L{_QueuedJob} or None
1880     @return: either None or the job object
1881
1882     """
1883     job = self._memcache.get(job_id, None)
1884     if job:
1885       logging.debug("Found job %s in memcache", job_id)
1886       assert job.writable, "Found read-only job in memcache"
1887       return job
1888
1889     try:
1890       job = self._LoadJobFromDisk(job_id, False)
1891       if job is None:
1892         return job
1893     except errors.JobFileCorrupted:
1894       old_path = self._GetJobPath(job_id)
1895       new_path = self._GetArchivedJobPath(job_id)
1896       if old_path == new_path:
1897         # job already archived (future case)
1898         logging.exception("Can't parse job %s", job_id)
1899       else:
1900         # non-archived case
1901         logging.exception("Can't parse job %s, will archive.", job_id)
1902         self._RenameFilesUnlocked([(old_path, new_path)])
1903       return None
1904
1905     assert job.writable, "Job just loaded is not writable"
1906
1907     self._memcache[job_id] = job
1908     logging.debug("Added job %s to the cache", job_id)
1909     return job
1910
1911   def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1912     """Load the given job file from disk.
1913
1914     Given a job file, read, load and restore it in a _QueuedJob format.
1915
1916     @type job_id: string
1917     @param job_id: job identifier
1918     @type try_archived: bool
1919     @param try_archived: Whether to try loading an archived job
1920     @rtype: L{_QueuedJob} or None
1921     @return: either None or the job object
1922
1923     """
1924     path_functions = [(self._GetJobPath, True)]
1925
1926     if try_archived:
1927       path_functions.append((self._GetArchivedJobPath, False))
1928
1929     raw_data = None
1930     writable_default = None
1931
1932     for (fn, writable_default) in path_functions:
1933       filepath = fn(job_id)
1934       logging.debug("Loading job from %s", filepath)
1935       try:
1936         raw_data = utils.ReadFile(filepath)
1937       except EnvironmentError, err:
1938         if err.errno != errno.ENOENT:
1939           raise
1940       else:
1941         break
1942
1943     if not raw_data:
1944       return None
1945
1946     if writable is None:
1947       writable = writable_default
1948
1949     try:
1950       data = serializer.LoadJson(raw_data)
1951       job = _QueuedJob.Restore(self, data, writable)
1952     except Exception, err: # pylint: disable-msg=W0703
1953       raise errors.JobFileCorrupted(err)
1954
1955     return job
1956
1957   def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1958     """Load the given job file from disk.
1959
1960     Given a job file, read, load and restore it in a _QueuedJob format.
1961     In case of error reading the job, it gets returned as None, and the
1962     exception is logged.
1963
1964     @type job_id: string
1965     @param job_id: job identifier
1966     @type try_archived: bool
1967     @param try_archived: Whether to try loading an archived job
1968     @rtype: L{_QueuedJob} or None
1969     @return: either None or the job object
1970
1971     """
1972     try:
1973       return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1974     except (errors.JobFileCorrupted, EnvironmentError):
1975       logging.exception("Can't load/parse job %s", job_id)
1976       return None
1977
1978   def _UpdateQueueSizeUnlocked(self):
1979     """Update the queue size.
1980
1981     """
1982     self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1983
1984   @locking.ssynchronized(_LOCK)
1985   @_RequireOpenQueue
1986   def SetDrainFlag(self, drain_flag):
1987     """Sets the drain flag for the queue.
1988
1989     @type drain_flag: boolean
1990     @param drain_flag: Whether to set or unset the drain flag
1991
1992     """
1993     jstore.SetDrainFlag(drain_flag)
1994
1995     self._drained = drain_flag
1996
1997     return True
1998
1999   @_RequireOpenQueue
2000   def _SubmitJobUnlocked(self, job_id, ops):
2001     """Create and store a new job.
2002
2003     This enters the job into our job queue and also puts it on the new
2004     queue, in order for it to be picked up by the queue processors.
2005
2006     @type job_id: job ID
2007     @param job_id: the job ID for the new job
2008     @type ops: list
2009     @param ops: The list of OpCodes that will become the new job.
2010     @rtype: L{_QueuedJob}
2011     @return: the job object to be queued
2012     @raise errors.JobQueueDrainError: if the job queue is marked for draining
2013     @raise errors.JobQueueFull: if the job queue has too many jobs in it
2014     @raise errors.GenericError: If an opcode is not valid
2015
2016     """
2017     # Ok when sharing the big job queue lock, as the drain file is created when
2018     # the lock is exclusive.
2019     if self._drained:
2020       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
2021
2022     if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2023       raise errors.JobQueueFull()
2024
2025     job = _QueuedJob(self, job_id, ops, True)
2026
2027     # Check priority
2028     for idx, op in enumerate(job.ops):
2029       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2030         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2031         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2032                                   " are %s" % (idx, op.priority, allowed))
2033
2034       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
2035       if not opcodes.TNoRelativeJobDependencies(dependencies):
2036         raise errors.GenericError("Opcode %s has invalid dependencies, must"
2037                                   " match %s: %s" %
2038                                   (idx, opcodes.TNoRelativeJobDependencies,
2039                                    dependencies))
2040
2041     # Write to disk
2042     self.UpdateJobUnlocked(job)
2043
2044     self._queue_size += 1
2045
2046     logging.debug("Adding new job %s to the cache", job_id)
2047     self._memcache[job_id] = job
2048
2049     return job
2050
2051   @locking.ssynchronized(_LOCK)
2052   @_RequireOpenQueue
2053   def SubmitJob(self, ops):
2054     """Create and store a new job.
2055
2056     @see: L{_SubmitJobUnlocked}
2057
2058     """
2059     (job_id, ) = self._NewSerialsUnlocked(1)
2060     self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2061     return job_id
2062
2063   @locking.ssynchronized(_LOCK)
2064   @_RequireOpenQueue
2065   def SubmitManyJobs(self, jobs):
2066     """Create and store multiple jobs.
2067
2068     @see: L{_SubmitJobUnlocked}
2069
2070     """
2071     all_job_ids = self._NewSerialsUnlocked(len(jobs))
2072
2073     (results, added_jobs) = \
2074       self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2075
2076     self._EnqueueJobsUnlocked(added_jobs)
2077
2078     return results
2079
2080   @staticmethod
2081   def _FormatSubmitError(msg, ops):
2082     """Formats errors which occurred while submitting a job.
2083
2084     """
2085     return ("%s; opcodes %s" %
2086             (msg, utils.CommaJoin(op.Summary() for op in ops)))
2087
2088   @staticmethod
2089   def _ResolveJobDependencies(resolve_fn, deps):
2090     """Resolves relative job IDs in dependencies.
2091
2092     @type resolve_fn: callable
2093     @param resolve_fn: Function to resolve a relative job ID
2094     @type deps: list
2095     @param deps: Dependencies
2096     @rtype: list
2097     @return: Resolved dependencies
2098
2099     """
2100     result = []
2101
2102     for (dep_job_id, dep_status) in deps:
2103       if ht.TRelativeJobId(dep_job_id):
2104         assert ht.TInt(dep_job_id) and dep_job_id < 0
2105         try:
2106           job_id = resolve_fn(dep_job_id)
2107         except IndexError:
2108           # Abort
2109           return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2110       else:
2111         job_id = dep_job_id
2112
2113       result.append((job_id, dep_status))
2114
2115     return (True, result)
2116
2117   def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2118     """Create and store multiple jobs.
2119
2120     @see: L{_SubmitJobUnlocked}
2121
2122     """
2123     results = []
2124     added_jobs = []
2125
2126     def resolve_fn(job_idx, reljobid):
2127       assert reljobid < 0
2128       return (previous_job_ids + job_ids[:job_idx])[reljobid]
2129
2130     for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2131       for op in ops:
2132         if getattr(op, opcodes.DEPEND_ATTR, None):
2133           (status, data) = \
2134             self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2135                                          op.depends)
2136           if not status:
2137             # Abort resolving dependencies
2138             assert ht.TNonEmptyString(data), "No error message"
2139             break
2140           # Use resolved dependencies
2141           op.depends = data
2142       else:
2143         try:
2144           job = self._SubmitJobUnlocked(job_id, ops)
2145         except errors.GenericError, err:
2146           status = False
2147           data = self._FormatSubmitError(str(err), ops)
2148         else:
2149           status = True
2150           data = job_id
2151           added_jobs.append(job)
2152
2153       results.append((status, data))
2154
2155     return (results, added_jobs)
2156
2157   @locking.ssynchronized(_LOCK)
2158   def _EnqueueJobs(self, jobs):
2159     """Helper function to add jobs to worker pool's queue.
2160
2161     @type jobs: list
2162     @param jobs: List of all jobs
2163
2164     """
2165     return self._EnqueueJobsUnlocked(jobs)
2166
2167   def _EnqueueJobsUnlocked(self, jobs):
2168     """Helper function to add jobs to worker pool's queue.
2169
2170     @type jobs: list
2171     @param jobs: List of all jobs
2172
2173     """
2174     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
2175     self._wpool.AddManyTasks([(job, ) for job in jobs],
2176                              priority=[job.CalcPriority() for job in jobs])
2177
2178   def _GetJobStatusForDependencies(self, job_id):
2179     """Gets the status of a job for dependencies.
2180
2181     @type job_id: string
2182     @param job_id: Job ID
2183     @raise errors.JobLost: If job can't be found
2184
2185     """
2186     if not isinstance(job_id, basestring):
2187       job_id = self._FormatJobID(job_id)
2188
2189     # Not using in-memory cache as doing so would require an exclusive lock
2190
2191     # Try to load from disk
2192     job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2193
2194     assert not job.writable, "Got writable job"
2195
2196     if job:
2197       return job.CalcStatus()
2198
2199     raise errors.JobLost("Job %s not found" % job_id)
2200
2201   @_RequireOpenQueue
2202   def UpdateJobUnlocked(self, job, replicate=True):
2203     """Update a job's on disk storage.
2204
2205     After a job has been modified, this function needs to be called in
2206     order to write the changes to disk and replicate them to the other
2207     nodes.
2208
2209     @type job: L{_QueuedJob}
2210     @param job: the changed job
2211     @type replicate: boolean
2212     @param replicate: whether to replicate the change to remote nodes
2213
2214     """
2215     if __debug__:
2216       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
2217       assert (finalized ^ (job.end_timestamp is None))
2218       assert job.writable, "Can't update read-only job"
2219
2220     filename = self._GetJobPath(job.id)
2221     data = serializer.DumpJson(job.Serialize(), indent=False)
2222     logging.debug("Writing job %s to %s", job.id, filename)
2223     self._UpdateJobQueueFile(filename, data, replicate)
2224
2225   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
2226                         timeout):
2227     """Waits for changes in a job.
2228
2229     @type job_id: string
2230     @param job_id: Job identifier
2231     @type fields: list of strings
2232     @param fields: Which fields to check for changes
2233     @type prev_job_info: list or None
2234     @param prev_job_info: Last job information returned
2235     @type prev_log_serial: int
2236     @param prev_log_serial: Last job message serial number
2237     @type timeout: float
2238     @param timeout: maximum time to wait in seconds
2239     @rtype: tuple (job info, log entries)
2240     @return: a tuple of the job information as required via
2241         the fields parameter, and the log entries as a list
2242
2243         if the job has not changed and the timeout has expired,
2244         we instead return a special value,
2245         L{constants.JOB_NOTCHANGED}, which should be interpreted
2246         as such by the clients
2247
2248     """
2249     load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
2250                              writable=False)
2251
2252     helper = _WaitForJobChangesHelper()
2253
2254     return helper(self._GetJobPath(job_id), load_fn,
2255                   fields, prev_job_info, prev_log_serial, timeout)
2256
2257   @locking.ssynchronized(_LOCK)
2258   @_RequireOpenQueue
2259   def CancelJob(self, job_id):
2260     """Cancels a job.
2261
2262     This will only succeed if the job has not started yet.
2263
2264     @type job_id: string
2265     @param job_id: job ID of job to be cancelled.
2266
2267     """
2268     logging.info("Cancelling job %s", job_id)
2269
2270     job = self._LoadJobUnlocked(job_id)
2271     if not job:
2272       logging.debug("Job %s not found", job_id)
2273       return (False, "Job %s not found" % job_id)
2274
2275     assert job.writable, "Can't cancel read-only job"
2276
2277     (success, msg) = job.Cancel()
2278
2279     if success:
2280       # If the job was finalized (e.g. cancelled), this is the final write
2281       # allowed. The job can be archived anytime.
2282       self.UpdateJobUnlocked(job)
2283
2284     return (success, msg)
2285
2286   @_RequireOpenQueue
2287   def _ArchiveJobsUnlocked(self, jobs):
2288     """Archives jobs.
2289
2290     @type jobs: list of L{_QueuedJob}
2291     @param jobs: Job objects
2292     @rtype: int
2293     @return: Number of archived jobs
2294
2295     """
2296     archive_jobs = []
2297     rename_files = []
2298     for job in jobs:
2299       assert job.writable, "Can't archive read-only job"
2300
2301       if job.CalcStatus() not in constants.JOBS_FINALIZED:
2302         logging.debug("Job %s is not yet done", job.id)
2303         continue
2304
2305       archive_jobs.append(job)
2306
2307       old = self._GetJobPath(job.id)
2308       new = self._GetArchivedJobPath(job.id)
2309       rename_files.append((old, new))
2310
2311     # TODO: What if 1..n files fail to rename?
2312     self._RenameFilesUnlocked(rename_files)
2313
2314     logging.debug("Successfully archived job(s) %s",
2315                   utils.CommaJoin(job.id for job in archive_jobs))
2316
2317     # Since we haven't quite checked, above, if we succeeded or failed renaming
2318     # the files, we update the cached queue size from the filesystem. When we
2319     # get around to fix the TODO: above, we can use the number of actually
2320     # archived jobs to fix this.
2321     self._UpdateQueueSizeUnlocked()
2322     return len(archive_jobs)
2323
2324   @locking.ssynchronized(_LOCK)
2325   @_RequireOpenQueue
2326   def ArchiveJob(self, job_id):
2327     """Archives a job.
2328
2329     This is just a wrapper over L{_ArchiveJobsUnlocked}.
2330
2331     @type job_id: string
2332     @param job_id: Job ID of job to be archived.
2333     @rtype: bool
2334     @return: Whether job was archived
2335
2336     """
2337     logging.info("Archiving job %s", job_id)
2338
2339     job = self._LoadJobUnlocked(job_id)
2340     if not job:
2341       logging.debug("Job %s not found", job_id)
2342       return False
2343
2344     return self._ArchiveJobsUnlocked([job]) == 1
2345
2346   @locking.ssynchronized(_LOCK)
2347   @_RequireOpenQueue
2348   def AutoArchiveJobs(self, age, timeout):
2349     """Archives all jobs based on age.
2350
2351     The method will archive all jobs which are older than the age
2352     parameter. For jobs that don't have an end timestamp, the start
2353     timestamp will be considered. The special '-1' age will cause
2354     archival of all jobs (that are not running or queued).
2355
2356     @type age: int
2357     @param age: the minimum age in seconds
2358
2359     """
2360     logging.info("Archiving jobs with age more than %s seconds", age)
2361
2362     now = time.time()
2363     end_time = now + timeout
2364     archived_count = 0
2365     last_touched = 0
2366
2367     all_job_ids = self._GetJobIDsUnlocked()
2368     pending = []
2369     for idx, job_id in enumerate(all_job_ids):
2370       last_touched = idx + 1
2371
2372       # Not optimal because jobs could be pending
2373       # TODO: Measure average duration for job archival and take number of
2374       # pending jobs into account.
2375       if time.time() > end_time:
2376         break
2377
2378       # Returns None if the job failed to load
2379       job = self._LoadJobUnlocked(job_id)
2380       if job:
2381         if job.end_timestamp is None:
2382           if job.start_timestamp is None:
2383             job_age = job.received_timestamp
2384           else:
2385             job_age = job.start_timestamp
2386         else:
2387           job_age = job.end_timestamp
2388
2389         if age == -1 or now - job_age[0] > age:
2390           pending.append(job)
2391
2392           # Archive 10 jobs at a time
2393           if len(pending) >= 10:
2394             archived_count += self._ArchiveJobsUnlocked(pending)
2395             pending = []
2396
2397     if pending:
2398       archived_count += self._ArchiveJobsUnlocked(pending)
2399
2400     return (archived_count, len(all_job_ids) - last_touched)
2401
2402   def QueryJobs(self, job_ids, fields):
2403     """Returns a list of jobs in queue.
2404
2405     @type job_ids: list
2406     @param job_ids: sequence of job identifiers or None for all
2407     @type fields: list
2408     @param fields: names of fields to return
2409     @rtype: list
2410     @return: list one element per job, each element being list with
2411         the requested fields
2412
2413     """
2414     jobs = []
2415     list_all = False
2416     if not job_ids:
2417       # Since files are added to/removed from the queue atomically, there's no
2418       # risk of getting the job ids in an inconsistent state.
2419       job_ids = self._GetJobIDsUnlocked()
2420       list_all = True
2421
2422     for job_id in job_ids:
2423       job = self.SafeLoadJobFromDisk(job_id, True)
2424       if job is not None:
2425         jobs.append(job.GetInfo(fields))
2426       elif not list_all:
2427         jobs.append(None)
2428
2429     return jobs
2430
2431   @locking.ssynchronized(_LOCK)
2432   @_RequireOpenQueue
2433   def Shutdown(self):
2434     """Stops the job queue.
2435
2436     This shutdowns all the worker threads an closes the queue.
2437
2438     """
2439     self._wpool.TerminateWorkers()
2440
2441     self._queue_filelock.Close()
2442     self._queue_filelock = None