jqueue: Log progress and load jobs one by one
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50 JOBQUEUE_THREADS = 25
51
52
53 def TimeStampNow():
54   """Returns the current timestamp.
55
56   @rtype: tuple
57   @return: the current time in the (seconds, microseconds) format
58
59   """
60   return utils.SplitTime(time.time())
61
62
63 class _QueuedOpCode(object):
64   """Encasulates an opcode object.
65
66   @ivar log: holds the execution log and consists of tuples
67   of the form C{(log_serial, timestamp, level, message)}
68   @ivar input: the OpCode we encapsulate
69   @ivar status: the current status
70   @ivar result: the result of the LU execution
71   @ivar start_timestamp: timestamp for the start of the execution
72   @ivar stop_timestamp: timestamp for the end of the execution
73
74   """
75   def __init__(self, op):
76     """Constructor for the _QuededOpCode.
77
78     @type op: L{opcodes.OpCode}
79     @param op: the opcode we encapsulate
80
81     """
82     self.input = op
83     self.status = constants.OP_STATUS_QUEUED
84     self.result = None
85     self.log = []
86     self.start_timestamp = None
87     self.end_timestamp = None
88
89   @classmethod
90   def Restore(cls, state):
91     """Restore the _QueuedOpCode from the serialized form.
92
93     @type state: dict
94     @param state: the serialized state
95     @rtype: _QueuedOpCode
96     @return: a new _QueuedOpCode instance
97
98     """
99     obj = _QueuedOpCode.__new__(cls)
100     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
101     obj.status = state["status"]
102     obj.result = state["result"]
103     obj.log = state["log"]
104     obj.start_timestamp = state.get("start_timestamp", None)
105     obj.end_timestamp = state.get("end_timestamp", None)
106     return obj
107
108   def Serialize(self):
109     """Serializes this _QueuedOpCode.
110
111     @rtype: dict
112     @return: the dictionary holding the serialized state
113
114     """
115     return {
116       "input": self.input.__getstate__(),
117       "status": self.status,
118       "result": self.result,
119       "log": self.log,
120       "start_timestamp": self.start_timestamp,
121       "end_timestamp": self.end_timestamp,
122       }
123
124
125 class _QueuedJob(object):
126   """In-memory job representation.
127
128   This is what we use to track the user-submitted jobs. Locking must
129   be taken care of by users of this class.
130
131   @type queue: L{JobQueue}
132   @ivar queue: the parent queue
133   @ivar id: the job ID
134   @type ops: list
135   @ivar ops: the list of _QueuedOpCode that constitute the job
136   @type run_op_index: int
137   @ivar run_op_index: the currently executing opcode, or -1 if
138       we didn't yet start executing
139   @type log_serial: int
140   @ivar log_serial: holds the index for the next log entry
141   @ivar received_timestamp: the timestamp for when the job was received
142   @ivar start_timestmap: the timestamp for start of execution
143   @ivar end_timestamp: the timestamp for end of execution
144   @ivar change: a Condition variable we use for waiting for job changes
145
146   """
147   def __init__(self, queue, job_id, ops):
148     """Constructor for the _QueuedJob.
149
150     @type queue: L{JobQueue}
151     @param queue: our parent queue
152     @type job_id: job_id
153     @param job_id: our job id
154     @type ops: list
155     @param ops: the list of opcodes we hold, which will be encapsulated
156         in _QueuedOpCodes
157
158     """
159     if not ops:
160       # TODO: use a better exception
161       raise Exception("No opcodes")
162
163     self.queue = queue
164     self.id = job_id
165     self.ops = [_QueuedOpCode(op) for op in ops]
166     self.run_op_index = -1
167     self.log_serial = 0
168     self.received_timestamp = TimeStampNow()
169     self.start_timestamp = None
170     self.end_timestamp = None
171
172     # Condition to wait for changes
173     self.change = threading.Condition(self.queue._lock)
174
175   @classmethod
176   def Restore(cls, queue, state):
177     """Restore a _QueuedJob from serialized state:
178
179     @type queue: L{JobQueue}
180     @param queue: to which queue the restored job belongs
181     @type state: dict
182     @param state: the serialized state
183     @rtype: _JobQueue
184     @return: the restored _JobQueue instance
185
186     """
187     obj = _QueuedJob.__new__(cls)
188     obj.queue = queue
189     obj.id = state["id"]
190     obj.run_op_index = state["run_op_index"]
191     obj.received_timestamp = state.get("received_timestamp", None)
192     obj.start_timestamp = state.get("start_timestamp", None)
193     obj.end_timestamp = state.get("end_timestamp", None)
194
195     obj.ops = []
196     obj.log_serial = 0
197     for op_state in state["ops"]:
198       op = _QueuedOpCode.Restore(op_state)
199       for log_entry in op.log:
200         obj.log_serial = max(obj.log_serial, log_entry[0])
201       obj.ops.append(op)
202
203     # Condition to wait for changes
204     obj.change = threading.Condition(obj.queue._lock)
205
206     return obj
207
208   def Serialize(self):
209     """Serialize the _JobQueue instance.
210
211     @rtype: dict
212     @return: the serialized state
213
214     """
215     return {
216       "id": self.id,
217       "ops": [op.Serialize() for op in self.ops],
218       "run_op_index": self.run_op_index,
219       "start_timestamp": self.start_timestamp,
220       "end_timestamp": self.end_timestamp,
221       "received_timestamp": self.received_timestamp,
222       }
223
224   def CalcStatus(self):
225     """Compute the status of this job.
226
227     This function iterates over all the _QueuedOpCodes in the job and
228     based on their status, computes the job status.
229
230     The algorithm is:
231       - if we find a cancelled, or finished with error, the job
232         status will be the same
233       - otherwise, the last opcode with the status one of:
234           - waitlock
235           - running
236
237         will determine the job status
238
239       - otherwise, it means either all opcodes are queued, or success,
240         and the job status will be the same
241
242     @return: the job status
243
244     """
245     status = constants.JOB_STATUS_QUEUED
246
247     all_success = True
248     for op in self.ops:
249       if op.status == constants.OP_STATUS_SUCCESS:
250         continue
251
252       all_success = False
253
254       if op.status == constants.OP_STATUS_QUEUED:
255         pass
256       elif op.status == constants.OP_STATUS_WAITLOCK:
257         status = constants.JOB_STATUS_WAITLOCK
258       elif op.status == constants.OP_STATUS_RUNNING:
259         status = constants.JOB_STATUS_RUNNING
260       elif op.status == constants.OP_STATUS_ERROR:
261         status = constants.JOB_STATUS_ERROR
262         # The whole job fails if one opcode failed
263         break
264       elif op.status == constants.OP_STATUS_CANCELED:
265         status = constants.OP_STATUS_CANCELED
266         break
267
268     if all_success:
269       status = constants.JOB_STATUS_SUCCESS
270
271     return status
272
273   def GetLogEntries(self, newer_than):
274     """Selectively returns the log entries.
275
276     @type newer_than: None or int
277     @param newer_than: if this is None, return all log enties,
278         otherwise return only the log entries with serial higher
279         than this value
280     @rtype: list
281     @return: the list of the log entries selected
282
283     """
284     if newer_than is None:
285       serial = -1
286     else:
287       serial = newer_than
288
289     entries = []
290     for op in self.ops:
291       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
292
293     return entries
294
295
296 class _JobQueueWorker(workerpool.BaseWorker):
297   """The actual job workers.
298
299   """
300   def _NotifyStart(self):
301     """Mark the opcode as running, not lock-waiting.
302
303     This is called from the mcpu code as a notifier function, when the
304     LU is finally about to start the Exec() method. Of course, to have
305     end-user visible results, the opcode must be initially (before
306     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
307
308     """
309     assert self.queue, "Queue attribute is missing"
310     assert self.opcode, "Opcode attribute is missing"
311
312     self.queue.acquire()
313     try:
314       self.opcode.status = constants.OP_STATUS_RUNNING
315     finally:
316       self.queue.release()
317
318   def RunTask(self, job):
319     """Job executor.
320
321     This functions processes a job. It is closely tied to the _QueuedJob and
322     _QueuedOpCode classes.
323
324     @type job: L{_QueuedJob}
325     @param job: the job to be processed
326
327     """
328     logging.debug("Worker %s processing job %s",
329                   self.worker_id, job.id)
330     proc = mcpu.Processor(self.pool.queue.context)
331     self.queue = queue = job.queue
332     try:
333       try:
334         count = len(job.ops)
335         for idx, op in enumerate(job.ops):
336           try:
337             logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
338
339             queue.acquire()
340             try:
341               job.run_op_index = idx
342               op.status = constants.OP_STATUS_WAITLOCK
343               op.result = None
344               op.start_timestamp = TimeStampNow()
345               if idx == 0: # first opcode
346                 job.start_timestamp = op.start_timestamp
347               queue.UpdateJobUnlocked(job)
348
349               input_opcode = op.input
350             finally:
351               queue.release()
352
353             def _Log(*args):
354               """Append a log entry.
355
356               """
357               assert len(args) < 3
358
359               if len(args) == 1:
360                 log_type = constants.ELOG_MESSAGE
361                 log_msg = args[0]
362               else:
363                 log_type, log_msg = args
364
365               # The time is split to make serialization easier and not lose
366               # precision.
367               timestamp = utils.SplitTime(time.time())
368
369               queue.acquire()
370               try:
371                 job.log_serial += 1
372                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
373
374                 job.change.notifyAll()
375               finally:
376                 queue.release()
377
378             # Make sure not to hold lock while _Log is called
379             self.opcode = op
380             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
381
382             queue.acquire()
383             try:
384               op.status = constants.OP_STATUS_SUCCESS
385               op.result = result
386               op.end_timestamp = TimeStampNow()
387               queue.UpdateJobUnlocked(job)
388             finally:
389               queue.release()
390
391             logging.debug("Op %s/%s: Successfully finished %s",
392                           idx + 1, count, op)
393           except Exception, err:
394             queue.acquire()
395             try:
396               try:
397                 op.status = constants.OP_STATUS_ERROR
398                 op.result = str(err)
399                 op.end_timestamp = TimeStampNow()
400                 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
401               finally:
402                 queue.UpdateJobUnlocked(job)
403             finally:
404               queue.release()
405             raise
406
407       except errors.GenericError, err:
408         logging.exception("Ganeti exception")
409       except:
410         logging.exception("Unhandled exception")
411     finally:
412       queue.acquire()
413       try:
414         try:
415           job.run_op_idx = -1
416           job.end_timestamp = TimeStampNow()
417           queue.UpdateJobUnlocked(job)
418         finally:
419           job_id = job.id
420           status = job.CalcStatus()
421       finally:
422         queue.release()
423       logging.debug("Worker %s finished job %s, status = %s",
424                     self.worker_id, job_id, status)
425
426
427 class _JobQueueWorkerPool(workerpool.WorkerPool):
428   """Simple class implementing a job-processing workerpool.
429
430   """
431   def __init__(self, queue):
432     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
433                                               _JobQueueWorker)
434     self.queue = queue
435
436
437 class JobQueue(object):
438   """Quue used to manaage the jobs.
439
440   @cvar _RE_JOB_FILE: regex matching the valid job file names
441
442   """
443   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
444
445   def _RequireOpenQueue(fn):
446     """Decorator for "public" functions.
447
448     This function should be used for all 'public' functions. That is,
449     functions usually called from other classes.
450
451     @warning: Use this decorator only after utils.LockedMethod!
452
453     Example::
454       @utils.LockedMethod
455       @_RequireOpenQueue
456       def Example(self):
457         pass
458
459     """
460     def wrapper(self, *args, **kwargs):
461       assert self._queue_lock is not None, "Queue should be open"
462       return fn(self, *args, **kwargs)
463     return wrapper
464
465   def __init__(self, context):
466     """Constructor for JobQueue.
467
468     The constructor will initialize the job queue object and then
469     start loading the current jobs from disk, either for starting them
470     (if they were queue) or for aborting them (if they were already
471     running).
472
473     @type context: GanetiContext
474     @param context: the context object for access to the configuration
475         data and other ganeti objects
476
477     """
478     self.context = context
479     self._memcache = weakref.WeakValueDictionary()
480     self._my_hostname = utils.HostInfo().name
481
482     # Locking
483     self._lock = threading.Lock()
484     self.acquire = self._lock.acquire
485     self.release = self._lock.release
486
487     # Initialize
488     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
489
490     # Read serial file
491     self._last_serial = jstore.ReadSerial()
492     assert self._last_serial is not None, ("Serial file was modified between"
493                                            " check in jstore and here")
494
495     # Get initial list of nodes
496     self._nodes = dict((n.name, n.primary_ip)
497                        for n in self.context.cfg.GetAllNodesInfo().values())
498
499     # Remove master node
500     try:
501       del self._nodes[self._my_hostname]
502     except ValueError:
503       pass
504
505     # TODO: Check consistency across nodes
506
507     # Setup worker pool
508     self._wpool = _JobQueueWorkerPool(self)
509     try:
510       # We need to lock here because WorkerPool.AddTask() may start a job while
511       # we're still doing our work.
512       self.acquire()
513       try:
514         logging.info("Inspecting job queue")
515
516         all_job_ids = self._GetJobIDsUnlocked()
517         lastinfo = time.time()
518         for idx, job_id in enumerate(all_job_ids):
519           # Give an update every 1000 jobs or 10 seconds
520           if idx % 1000 == 0 or time.time() >= (lastinfo + 10.0):
521             jobs_count = len(all_job_ids)
522             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
523                          idx, jobs_count, 100.0 * (idx + 1) / jobs_count)
524             lastinfo = time.time()
525
526           job = self._LoadJobUnlocked(job_id)
527
528           # a failure in loading the job can cause 'None' to be returned
529           if job is None:
530             continue
531
532           status = job.CalcStatus()
533
534           if status in (constants.JOB_STATUS_QUEUED, ):
535             self._wpool.AddTask(job)
536
537           elif status in (constants.JOB_STATUS_RUNNING,
538                           constants.JOB_STATUS_WAITLOCK):
539             logging.warning("Unfinished job %s found: %s", job.id, job)
540             try:
541               for op in job.ops:
542                 op.status = constants.OP_STATUS_ERROR
543                 op.result = "Unclean master daemon shutdown"
544             finally:
545               self.UpdateJobUnlocked(job)
546
547         logging.info("Job queue inspection finished")
548       finally:
549         self.release()
550     except:
551       self._wpool.TerminateWorkers()
552       raise
553
554   @utils.LockedMethod
555   @_RequireOpenQueue
556   def AddNode(self, node):
557     """Register a new node with the queue.
558
559     @type node: L{objects.Node}
560     @param node: the node object to be added
561
562     """
563     node_name = node.name
564     assert node_name != self._my_hostname
565
566     # Clean queue directory on added node
567     rpc.RpcRunner.call_jobqueue_purge(node_name)
568
569     # Upload the whole queue excluding archived jobs
570     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
571
572     # Upload current serial file
573     files.append(constants.JOB_QUEUE_SERIAL_FILE)
574
575     for file_name in files:
576       # Read file content
577       fd = open(file_name, "r")
578       try:
579         content = fd.read()
580       finally:
581         fd.close()
582
583       result = rpc.RpcRunner.call_jobqueue_update([node_name],
584                                                   [node.primary_ip],
585                                                   file_name, content)
586       if not result[node_name]:
587         logging.error("Failed to upload %s to %s", file_name, node_name)
588
589     self._nodes[node_name] = node.primary_ip
590
591   @utils.LockedMethod
592   @_RequireOpenQueue
593   def RemoveNode(self, node_name):
594     """Callback called when removing nodes from the cluster.
595
596     @type node_name: str
597     @param node_name: the name of the node to remove
598
599     """
600     try:
601       # The queue is removed by the "leave node" RPC call.
602       del self._nodes[node_name]
603     except KeyError:
604       pass
605
606   def _CheckRpcResult(self, result, nodes, failmsg):
607     """Verifies the status of an RPC call.
608
609     Since we aim to keep consistency should this node (the current
610     master) fail, we will log errors if our rpc fail, and especially
611     log the case when more than half of the nodes failes.
612
613     @param result: the data as returned from the rpc call
614     @type nodes: list
615     @param nodes: the list of nodes we made the call to
616     @type failmsg: str
617     @param failmsg: the identifier to be used for logging
618
619     """
620     failed = []
621     success = []
622
623     for node in nodes:
624       if result[node]:
625         success.append(node)
626       else:
627         failed.append(node)
628
629     if failed:
630       logging.error("%s failed on %s", failmsg, ", ".join(failed))
631
632     # +1 for the master node
633     if (len(success) + 1) < len(failed):
634       # TODO: Handle failing nodes
635       logging.error("More than half of the nodes failed")
636
637   def _GetNodeIp(self):
638     """Helper for returning the node name/ip list.
639
640     @rtype: (list, list)
641     @return: a tuple of two lists, the first one with the node
642         names and the second one with the node addresses
643
644     """
645     name_list = self._nodes.keys()
646     addr_list = [self._nodes[name] for name in name_list]
647     return name_list, addr_list
648
649   def _WriteAndReplicateFileUnlocked(self, file_name, data):
650     """Writes a file locally and then replicates it to all nodes.
651
652     This function will replace the contents of a file on the local
653     node and then replicate it to all the other nodes we have.
654
655     @type file_name: str
656     @param file_name: the path of the file to be replicated
657     @type data: str
658     @param data: the new contents of the file
659
660     """
661     utils.WriteFile(file_name, data=data)
662
663     names, addrs = self._GetNodeIp()
664     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
665     self._CheckRpcResult(result, self._nodes,
666                          "Updating %s" % file_name)
667
668   def _RenameFileUnlocked(self, old, new):
669     """Renames a file locally and then replicate the change.
670
671     This function will rename a file in the local queue directory
672     and then replicate this rename to all the other nodes we have.
673
674     @type old: str
675     @param old: the current name of the file
676     @type new: str
677     @param new: the new name of the file
678
679     """
680     os.rename(old, new)
681
682     names, addrs = self._GetNodeIp()
683     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
684     self._CheckRpcResult(result, self._nodes,
685                          "Moving %s to %s" % (old, new))
686
687   def _FormatJobID(self, job_id):
688     """Convert a job ID to string format.
689
690     Currently this just does C{str(job_id)} after performing some
691     checks, but if we want to change the job id format this will
692     abstract this change.
693
694     @type job_id: int or long
695     @param job_id: the numeric job id
696     @rtype: str
697     @return: the formatted job id
698
699     """
700     if not isinstance(job_id, (int, long)):
701       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
702     if job_id < 0:
703       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
704
705     return str(job_id)
706
707   def _NewSerialUnlocked(self):
708     """Generates a new job identifier.
709
710     Job identifiers are unique during the lifetime of a cluster.
711
712     @rtype: str
713     @return: a string representing the job identifier.
714
715     """
716     # New number
717     serial = self._last_serial + 1
718
719     # Write to file
720     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
721                                         "%s\n" % serial)
722
723     # Keep it only if we were able to write the file
724     self._last_serial = serial
725
726     return self._FormatJobID(serial)
727
728   @staticmethod
729   def _GetJobPath(job_id):
730     """Returns the job file for a given job id.
731
732     @type job_id: str
733     @param job_id: the job identifier
734     @rtype: str
735     @return: the path to the job file
736
737     """
738     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
739
740   @staticmethod
741   def _GetArchivedJobPath(job_id):
742     """Returns the archived job file for a give job id.
743
744     @type job_id: str
745     @param job_id: the job identifier
746     @rtype: str
747     @return: the path to the archived job file
748
749     """
750     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
751
752   @classmethod
753   def _ExtractJobID(cls, name):
754     """Extract the job id from a filename.
755
756     @type name: str
757     @param name: the job filename
758     @rtype: job id or None
759     @return: the job id corresponding to the given filename,
760         or None if the filename does not represent a valid
761         job file
762
763     """
764     m = cls._RE_JOB_FILE.match(name)
765     if m:
766       return m.group(1)
767     else:
768       return None
769
770   def _GetJobIDsUnlocked(self, archived=False):
771     """Return all known job IDs.
772
773     If the parameter archived is True, archived jobs IDs will be
774     included. Currently this argument is unused.
775
776     The method only looks at disk because it's a requirement that all
777     jobs are present on disk (so in the _memcache we don't have any
778     extra IDs).
779
780     @rtype: list
781     @return: the list of job IDs
782
783     """
784     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
785     jlist = utils.NiceSort(jlist)
786     return jlist
787
788   def _ListJobFiles(self):
789     """Returns the list of current job files.
790
791     @rtype: list
792     @return: the list of job file names
793
794     """
795     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
796             if self._RE_JOB_FILE.match(name)]
797
798   def _LoadJobUnlocked(self, job_id):
799     """Loads a job from the disk or memory.
800
801     Given a job id, this will return the cached job object if
802     existing, or try to load the job from the disk. If loading from
803     disk, it will also add the job to the cache.
804
805     @param job_id: the job id
806     @rtype: L{_QueuedJob} or None
807     @return: either None or the job object
808
809     """
810     job = self._memcache.get(job_id, None)
811     if job:
812       logging.debug("Found job %s in memcache", job_id)
813       return job
814
815     filepath = self._GetJobPath(job_id)
816     logging.debug("Loading job from %s", filepath)
817     try:
818       fd = open(filepath, "r")
819     except IOError, err:
820       if err.errno in (errno.ENOENT, ):
821         return None
822       raise
823     try:
824       data = serializer.LoadJson(fd.read())
825     finally:
826       fd.close()
827
828     try:
829       job = _QueuedJob.Restore(self, data)
830     except Exception, err:
831       new_path = self._GetArchivedJobPath(job_id)
832       if filepath == new_path:
833         # job already archived (future case)
834         logging.exception("Can't parse job %s", job_id)
835       else:
836         # non-archived case
837         logging.exception("Can't parse job %s, will archive.", job_id)
838         self._RenameFileUnlocked(filepath, new_path)
839       return None
840
841     self._memcache[job_id] = job
842     logging.debug("Added job %s to the cache", job_id)
843     return job
844
845   def _GetJobsUnlocked(self, job_ids):
846     """Return a list of jobs based on their IDs.
847
848     @type job_ids: list
849     @param job_ids: either an empty list (meaning all jobs),
850         or a list of job IDs
851     @rtype: list
852     @return: the list of job objects
853
854     """
855     if not job_ids:
856       job_ids = self._GetJobIDsUnlocked()
857
858     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
859
860   @staticmethod
861   def _IsQueueMarkedDrain():
862     """Check if the queue is marked from drain.
863
864     This currently uses the queue drain file, which makes it a
865     per-node flag. In the future this can be moved to the config file.
866
867     @rtype: boolean
868     @return: True of the job queue is marked for draining
869
870     """
871     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
872
873   @staticmethod
874   def SetDrainFlag(drain_flag):
875     """Sets the drain flag for the queue.
876
877     This is similar to the function L{backend.JobQueueSetDrainFlag},
878     and in the future we might merge them.
879
880     @type drain_flag: boolean
881     @param drain_flag: wheter to set or unset the drain flag
882
883     """
884     if drain_flag:
885       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
886     else:
887       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
888     return True
889
890   @utils.LockedMethod
891   @_RequireOpenQueue
892   def SubmitJob(self, ops):
893     """Create and store a new job.
894
895     This enters the job into our job queue and also puts it on the new
896     queue, in order for it to be picked up by the queue processors.
897
898     @type ops: list
899     @param ops: The list of OpCodes that will become the new job.
900     @rtype: job ID
901     @return: the job ID of the newly created job
902     @raise errors.JobQueueDrainError: if the job is marked for draining
903
904     """
905     if self._IsQueueMarkedDrain():
906       raise errors.JobQueueDrainError()
907     # Get job identifier
908     job_id = self._NewSerialUnlocked()
909     job = _QueuedJob(self, job_id, ops)
910
911     # Write to disk
912     self.UpdateJobUnlocked(job)
913
914     logging.debug("Adding new job %s to the cache", job_id)
915     self._memcache[job_id] = job
916
917     # Add to worker pool
918     self._wpool.AddTask(job)
919
920     return job.id
921
922   @_RequireOpenQueue
923   def UpdateJobUnlocked(self, job):
924     """Update a job's on disk storage.
925
926     After a job has been modified, this function needs to be called in
927     order to write the changes to disk and replicate them to the other
928     nodes.
929
930     @type job: L{_QueuedJob}
931     @param job: the changed job
932
933     """
934     filename = self._GetJobPath(job.id)
935     data = serializer.DumpJson(job.Serialize(), indent=False)
936     logging.debug("Writing job %s to %s", job.id, filename)
937     self._WriteAndReplicateFileUnlocked(filename, data)
938
939     # Notify waiters about potential changes
940     job.change.notifyAll()
941
942   @utils.LockedMethod
943   @_RequireOpenQueue
944   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
945                         timeout):
946     """Waits for changes in a job.
947
948     @type job_id: string
949     @param job_id: Job identifier
950     @type fields: list of strings
951     @param fields: Which fields to check for changes
952     @type prev_job_info: list or None
953     @param prev_job_info: Last job information returned
954     @type prev_log_serial: int
955     @param prev_log_serial: Last job message serial number
956     @type timeout: float
957     @param timeout: maximum time to wait
958     @rtype: tuple (job info, log entries)
959     @return: a tuple of the job information as required via
960         the fields parameter, and the log entries as a list
961
962         if the job has not changed and the timeout has expired,
963         we instead return a special value,
964         L{constants.JOB_NOTCHANGED}, which should be interpreted
965         as such by the clients
966
967     """
968     logging.debug("Waiting for changes in job %s", job_id)
969     end_time = time.time() + timeout
970     while True:
971       delta_time = end_time - time.time()
972       if delta_time < 0:
973         return constants.JOB_NOTCHANGED
974
975       job = self._LoadJobUnlocked(job_id)
976       if not job:
977         logging.debug("Job %s not found", job_id)
978         break
979
980       status = job.CalcStatus()
981       job_info = self._GetJobInfoUnlocked(job, fields)
982       log_entries = job.GetLogEntries(prev_log_serial)
983
984       # Serializing and deserializing data can cause type changes (e.g. from
985       # tuple to list) or precision loss. We're doing it here so that we get
986       # the same modifications as the data received from the client. Without
987       # this, the comparison afterwards might fail without the data being
988       # significantly different.
989       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
990       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
991
992       if status not in (constants.JOB_STATUS_QUEUED,
993                         constants.JOB_STATUS_RUNNING,
994                         constants.JOB_STATUS_WAITLOCK):
995         # Don't even try to wait if the job is no longer running, there will be
996         # no changes.
997         break
998
999       if (prev_job_info != job_info or
1000           (log_entries and prev_log_serial != log_entries[0][0])):
1001         break
1002
1003       logging.debug("Waiting again")
1004
1005       # Release the queue lock while waiting
1006       job.change.wait(delta_time)
1007
1008     logging.debug("Job %s changed", job_id)
1009
1010     return (job_info, log_entries)
1011
1012   @utils.LockedMethod
1013   @_RequireOpenQueue
1014   def CancelJob(self, job_id):
1015     """Cancels a job.
1016
1017     This will only succeed if the job has not started yet.
1018
1019     @type job_id: string
1020     @param job_id: job ID of job to be cancelled.
1021
1022     """
1023     logging.debug("Cancelling job %s", job_id)
1024
1025     job = self._LoadJobUnlocked(job_id)
1026     if not job:
1027       logging.debug("Job %s not found", job_id)
1028       return
1029
1030     if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
1031       logging.debug("Job %s is no longer in the queue", job.id)
1032       return
1033
1034     try:
1035       for op in job.ops:
1036         op.status = constants.OP_STATUS_ERROR
1037         op.result = "Job cancelled by request"
1038     finally:
1039       self.UpdateJobUnlocked(job)
1040
1041   @_RequireOpenQueue
1042   def _ArchiveJobUnlocked(self, job_id):
1043     """Archives a job.
1044
1045     @type job_id: string
1046     @param job_id: Job ID of job to be archived.
1047
1048     """
1049     logging.info("Archiving job %s", job_id)
1050
1051     job = self._LoadJobUnlocked(job_id)
1052     if not job:
1053       logging.debug("Job %s not found", job_id)
1054       return
1055
1056     if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1057                                 constants.JOB_STATUS_SUCCESS,
1058                                 constants.JOB_STATUS_ERROR):
1059       logging.debug("Job %s is not yet done", job.id)
1060       return
1061
1062     old = self._GetJobPath(job.id)
1063     new = self._GetArchivedJobPath(job.id)
1064
1065     self._RenameFileUnlocked(old, new)
1066
1067     logging.debug("Successfully archived job %s", job.id)
1068
1069   @utils.LockedMethod
1070   @_RequireOpenQueue
1071   def ArchiveJob(self, job_id):
1072     """Archives a job.
1073
1074     This is just a wrapper over L{_ArchiveJobUnlocked}.
1075
1076     @type job_id: string
1077     @param job_id: Job ID of job to be archived.
1078
1079     """
1080     return self._ArchiveJobUnlocked(job_id)
1081
1082   @utils.LockedMethod
1083   @_RequireOpenQueue
1084   def AutoArchiveJobs(self, age):
1085     """Archives all jobs based on age.
1086
1087     The method will archive all jobs which are older than the age
1088     parameter. For jobs that don't have an end timestamp, the start
1089     timestamp will be considered. The special '-1' age will cause
1090     archival of all jobs (that are not running or queued).
1091
1092     @type age: int
1093     @param age: the minimum age in seconds
1094
1095     """
1096     logging.info("Archiving jobs with age more than %s seconds", age)
1097
1098     now = time.time()
1099     for jid in self._GetJobIDsUnlocked(archived=False):
1100       job = self._LoadJobUnlocked(jid)
1101       if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1102                                   constants.OP_STATUS_ERROR,
1103                                   constants.OP_STATUS_CANCELED):
1104         continue
1105       if job.end_timestamp is None:
1106         if job.start_timestamp is None:
1107           job_age = job.received_timestamp
1108         else:
1109           job_age = job.start_timestamp
1110       else:
1111         job_age = job.end_timestamp
1112
1113       if age == -1 or now - job_age[0] > age:
1114         self._ArchiveJobUnlocked(jid)
1115
1116   def _GetJobInfoUnlocked(self, job, fields):
1117     """Returns information about a job.
1118
1119     @type job: L{_QueuedJob}
1120     @param job: the job which we query
1121     @type fields: list
1122     @param fields: names of fields to return
1123     @rtype: list
1124     @return: list with one element for each field
1125     @raise errors.OpExecError: when an invalid field
1126         has been passed
1127
1128     """
1129     row = []
1130     for fname in fields:
1131       if fname == "id":
1132         row.append(job.id)
1133       elif fname == "status":
1134         row.append(job.CalcStatus())
1135       elif fname == "ops":
1136         row.append([op.input.__getstate__() for op in job.ops])
1137       elif fname == "opresult":
1138         row.append([op.result for op in job.ops])
1139       elif fname == "opstatus":
1140         row.append([op.status for op in job.ops])
1141       elif fname == "oplog":
1142         row.append([op.log for op in job.ops])
1143       elif fname == "opstart":
1144         row.append([op.start_timestamp for op in job.ops])
1145       elif fname == "opend":
1146         row.append([op.end_timestamp for op in job.ops])
1147       elif fname == "received_ts":
1148         row.append(job.received_timestamp)
1149       elif fname == "start_ts":
1150         row.append(job.start_timestamp)
1151       elif fname == "end_ts":
1152         row.append(job.end_timestamp)
1153       elif fname == "summary":
1154         row.append([op.input.Summary() for op in job.ops])
1155       else:
1156         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1157     return row
1158
1159   @utils.LockedMethod
1160   @_RequireOpenQueue
1161   def QueryJobs(self, job_ids, fields):
1162     """Returns a list of jobs in queue.
1163
1164     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1165     processing for each job.
1166
1167     @type job_ids: list
1168     @param job_ids: sequence of job identifiers or None for all
1169     @type fields: list
1170     @param fields: names of fields to return
1171     @rtype: list
1172     @return: list one element per job, each element being list with
1173         the requested fields
1174
1175     """
1176     jobs = []
1177
1178     for job in self._GetJobsUnlocked(job_ids):
1179       if job is None:
1180         jobs.append(None)
1181       else:
1182         jobs.append(self._GetJobInfoUnlocked(job, fields))
1183
1184     return jobs
1185
1186   @utils.LockedMethod
1187   @_RequireOpenQueue
1188   def Shutdown(self):
1189     """Stops the job queue.
1190
1191     This shutdowns all the worker threads an closes the queue.
1192
1193     """
1194     self._wpool.TerminateWorkers()
1195
1196     self._queue_lock.Close()
1197     self._queue_lock = None