jqueue: Shutdown workerpool in case of a problem
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50 JOBQUEUE_THREADS = 25
51
52
53 def TimeStampNow():
54   """Returns the current timestamp.
55
56   @rtype: tuple
57   @return: the current time in the (seconds, microseconds) format
58
59   """
60   return utils.SplitTime(time.time())
61
62
63 class _QueuedOpCode(object):
64   """Encasulates an opcode object.
65
66   @ivar log: holds the execution log and consists of tuples
67   of the form C{(log_serial, timestamp, level, message)}
68   @ivar input: the OpCode we encapsulate
69   @ivar status: the current status
70   @ivar result: the result of the LU execution
71   @ivar start_timestamp: timestamp for the start of the execution
72   @ivar stop_timestamp: timestamp for the end of the execution
73
74   """
75   def __init__(self, op):
76     """Constructor for the _QuededOpCode.
77
78     @type op: L{opcodes.OpCode}
79     @param op: the opcode we encapsulate
80
81     """
82     self.input = op
83     self.status = constants.OP_STATUS_QUEUED
84     self.result = None
85     self.log = []
86     self.start_timestamp = None
87     self.end_timestamp = None
88
89   @classmethod
90   def Restore(cls, state):
91     """Restore the _QueuedOpCode from the serialized form.
92
93     @type state: dict
94     @param state: the serialized state
95     @rtype: _QueuedOpCode
96     @return: a new _QueuedOpCode instance
97
98     """
99     obj = _QueuedOpCode.__new__(cls)
100     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
101     obj.status = state["status"]
102     obj.result = state["result"]
103     obj.log = state["log"]
104     obj.start_timestamp = state.get("start_timestamp", None)
105     obj.end_timestamp = state.get("end_timestamp", None)
106     return obj
107
108   def Serialize(self):
109     """Serializes this _QueuedOpCode.
110
111     @rtype: dict
112     @return: the dictionary holding the serialized state
113
114     """
115     return {
116       "input": self.input.__getstate__(),
117       "status": self.status,
118       "result": self.result,
119       "log": self.log,
120       "start_timestamp": self.start_timestamp,
121       "end_timestamp": self.end_timestamp,
122       }
123
124
125 class _QueuedJob(object):
126   """In-memory job representation.
127
128   This is what we use to track the user-submitted jobs. Locking must
129   be taken care of by users of this class.
130
131   @type queue: L{JobQueue}
132   @ivar queue: the parent queue
133   @ivar id: the job ID
134   @type ops: list
135   @ivar ops: the list of _QueuedOpCode that constitute the job
136   @type run_op_index: int
137   @ivar run_op_index: the currently executing opcode, or -1 if
138       we didn't yet start executing
139   @type log_serial: int
140   @ivar log_serial: holds the index for the next log entry
141   @ivar received_timestamp: the timestamp for when the job was received
142   @ivar start_timestmap: the timestamp for start of execution
143   @ivar end_timestamp: the timestamp for end of execution
144   @ivar change: a Condition variable we use for waiting for job changes
145
146   """
147   def __init__(self, queue, job_id, ops):
148     """Constructor for the _QueuedJob.
149
150     @type queue: L{JobQueue}
151     @param queue: our parent queue
152     @type job_id: job_id
153     @param job_id: our job id
154     @type ops: list
155     @param ops: the list of opcodes we hold, which will be encapsulated
156         in _QueuedOpCodes
157
158     """
159     if not ops:
160       # TODO: use a better exception
161       raise Exception("No opcodes")
162
163     self.queue = queue
164     self.id = job_id
165     self.ops = [_QueuedOpCode(op) for op in ops]
166     self.run_op_index = -1
167     self.log_serial = 0
168     self.received_timestamp = TimeStampNow()
169     self.start_timestamp = None
170     self.end_timestamp = None
171
172     # Condition to wait for changes
173     self.change = threading.Condition(self.queue._lock)
174
175   @classmethod
176   def Restore(cls, queue, state):
177     """Restore a _QueuedJob from serialized state:
178
179     @type queue: L{JobQueue}
180     @param queue: to which queue the restored job belongs
181     @type state: dict
182     @param state: the serialized state
183     @rtype: _JobQueue
184     @return: the restored _JobQueue instance
185
186     """
187     obj = _QueuedJob.__new__(cls)
188     obj.queue = queue
189     obj.id = state["id"]
190     obj.run_op_index = state["run_op_index"]
191     obj.received_timestamp = state.get("received_timestamp", None)
192     obj.start_timestamp = state.get("start_timestamp", None)
193     obj.end_timestamp = state.get("end_timestamp", None)
194
195     obj.ops = []
196     obj.log_serial = 0
197     for op_state in state["ops"]:
198       op = _QueuedOpCode.Restore(op_state)
199       for log_entry in op.log:
200         obj.log_serial = max(obj.log_serial, log_entry[0])
201       obj.ops.append(op)
202
203     # Condition to wait for changes
204     obj.change = threading.Condition(obj.queue._lock)
205
206     return obj
207
208   def Serialize(self):
209     """Serialize the _JobQueue instance.
210
211     @rtype: dict
212     @return: the serialized state
213
214     """
215     return {
216       "id": self.id,
217       "ops": [op.Serialize() for op in self.ops],
218       "run_op_index": self.run_op_index,
219       "start_timestamp": self.start_timestamp,
220       "end_timestamp": self.end_timestamp,
221       "received_timestamp": self.received_timestamp,
222       }
223
224   def CalcStatus(self):
225     """Compute the status of this job.
226
227     This function iterates over all the _QueuedOpCodes in the job and
228     based on their status, computes the job status.
229
230     The algorithm is:
231       - if we find a cancelled, or finished with error, the job
232         status will be the same
233       - otherwise, the last opcode with the status one of:
234           - waitlock
235           - running
236
237         will determine the job status
238
239       - otherwise, it means either all opcodes are queued, or success,
240         and the job status will be the same
241
242     @return: the job status
243
244     """
245     status = constants.JOB_STATUS_QUEUED
246
247     all_success = True
248     for op in self.ops:
249       if op.status == constants.OP_STATUS_SUCCESS:
250         continue
251
252       all_success = False
253
254       if op.status == constants.OP_STATUS_QUEUED:
255         pass
256       elif op.status == constants.OP_STATUS_WAITLOCK:
257         status = constants.JOB_STATUS_WAITLOCK
258       elif op.status == constants.OP_STATUS_RUNNING:
259         status = constants.JOB_STATUS_RUNNING
260       elif op.status == constants.OP_STATUS_ERROR:
261         status = constants.JOB_STATUS_ERROR
262         # The whole job fails if one opcode failed
263         break
264       elif op.status == constants.OP_STATUS_CANCELED:
265         status = constants.OP_STATUS_CANCELED
266         break
267
268     if all_success:
269       status = constants.JOB_STATUS_SUCCESS
270
271     return status
272
273   def GetLogEntries(self, newer_than):
274     """Selectively returns the log entries.
275
276     @type newer_than: None or int
277     @param newer_than: if this is None, return all log enties,
278         otherwise return only the log entries with serial higher
279         than this value
280     @rtype: list
281     @return: the list of the log entries selected
282
283     """
284     if newer_than is None:
285       serial = -1
286     else:
287       serial = newer_than
288
289     entries = []
290     for op in self.ops:
291       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
292
293     return entries
294
295
296 class _JobQueueWorker(workerpool.BaseWorker):
297   """The actual job workers.
298
299   """
300   def _NotifyStart(self):
301     """Mark the opcode as running, not lock-waiting.
302
303     This is called from the mcpu code as a notifier function, when the
304     LU is finally about to start the Exec() method. Of course, to have
305     end-user visible results, the opcode must be initially (before
306     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
307
308     """
309     assert self.queue, "Queue attribute is missing"
310     assert self.opcode, "Opcode attribute is missing"
311
312     self.queue.acquire()
313     try:
314       self.opcode.status = constants.OP_STATUS_RUNNING
315     finally:
316       self.queue.release()
317
318   def RunTask(self, job):
319     """Job executor.
320
321     This functions processes a job. It is closely tied to the _QueuedJob and
322     _QueuedOpCode classes.
323
324     @type job: L{_QueuedJob}
325     @param job: the job to be processed
326
327     """
328     logging.debug("Worker %s processing job %s",
329                   self.worker_id, job.id)
330     proc = mcpu.Processor(self.pool.queue.context)
331     self.queue = queue = job.queue
332     try:
333       try:
334         count = len(job.ops)
335         for idx, op in enumerate(job.ops):
336           try:
337             logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
338
339             queue.acquire()
340             try:
341               job.run_op_index = idx
342               op.status = constants.OP_STATUS_WAITLOCK
343               op.result = None
344               op.start_timestamp = TimeStampNow()
345               if idx == 0: # first opcode
346                 job.start_timestamp = op.start_timestamp
347               queue.UpdateJobUnlocked(job)
348
349               input_opcode = op.input
350             finally:
351               queue.release()
352
353             def _Log(*args):
354               """Append a log entry.
355
356               """
357               assert len(args) < 3
358
359               if len(args) == 1:
360                 log_type = constants.ELOG_MESSAGE
361                 log_msg = args[0]
362               else:
363                 log_type, log_msg = args
364
365               # The time is split to make serialization easier and not lose
366               # precision.
367               timestamp = utils.SplitTime(time.time())
368
369               queue.acquire()
370               try:
371                 job.log_serial += 1
372                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
373
374                 job.change.notifyAll()
375               finally:
376                 queue.release()
377
378             # Make sure not to hold lock while _Log is called
379             self.opcode = op
380             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
381
382             queue.acquire()
383             try:
384               op.status = constants.OP_STATUS_SUCCESS
385               op.result = result
386               op.end_timestamp = TimeStampNow()
387               queue.UpdateJobUnlocked(job)
388             finally:
389               queue.release()
390
391             logging.debug("Op %s/%s: Successfully finished %s",
392                           idx + 1, count, op)
393           except Exception, err:
394             queue.acquire()
395             try:
396               try:
397                 op.status = constants.OP_STATUS_ERROR
398                 op.result = str(err)
399                 op.end_timestamp = TimeStampNow()
400                 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
401               finally:
402                 queue.UpdateJobUnlocked(job)
403             finally:
404               queue.release()
405             raise
406
407       except errors.GenericError, err:
408         logging.exception("Ganeti exception")
409       except:
410         logging.exception("Unhandled exception")
411     finally:
412       queue.acquire()
413       try:
414         try:
415           job.run_op_idx = -1
416           job.end_timestamp = TimeStampNow()
417           queue.UpdateJobUnlocked(job)
418         finally:
419           job_id = job.id
420           status = job.CalcStatus()
421       finally:
422         queue.release()
423       logging.debug("Worker %s finished job %s, status = %s",
424                     self.worker_id, job_id, status)
425
426
427 class _JobQueueWorkerPool(workerpool.WorkerPool):
428   """Simple class implementing a job-processing workerpool.
429
430   """
431   def __init__(self, queue):
432     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
433                                               _JobQueueWorker)
434     self.queue = queue
435
436
437 class JobQueue(object):
438   """Quue used to manaage the jobs.
439
440   @cvar _RE_JOB_FILE: regex matching the valid job file names
441
442   """
443   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
444
445   def _RequireOpenQueue(fn):
446     """Decorator for "public" functions.
447
448     This function should be used for all 'public' functions. That is,
449     functions usually called from other classes.
450
451     @warning: Use this decorator only after utils.LockedMethod!
452
453     Example::
454       @utils.LockedMethod
455       @_RequireOpenQueue
456       def Example(self):
457         pass
458
459     """
460     def wrapper(self, *args, **kwargs):
461       assert self._queue_lock is not None, "Queue should be open"
462       return fn(self, *args, **kwargs)
463     return wrapper
464
465   def __init__(self, context):
466     """Constructor for JobQueue.
467
468     The constructor will initialize the job queue object and then
469     start loading the current jobs from disk, either for starting them
470     (if they were queue) or for aborting them (if they were already
471     running).
472
473     @type context: GanetiContext
474     @param context: the context object for access to the configuration
475         data and other ganeti objects
476
477     """
478     self.context = context
479     self._memcache = weakref.WeakValueDictionary()
480     self._my_hostname = utils.HostInfo().name
481
482     # Locking
483     self._lock = threading.Lock()
484     self.acquire = self._lock.acquire
485     self.release = self._lock.release
486
487     # Initialize
488     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
489
490     # Read serial file
491     self._last_serial = jstore.ReadSerial()
492     assert self._last_serial is not None, ("Serial file was modified between"
493                                            " check in jstore and here")
494
495     # Get initial list of nodes
496     self._nodes = dict((n.name, n.primary_ip)
497                        for n in self.context.cfg.GetAllNodesInfo().values())
498
499     # Remove master node
500     try:
501       del self._nodes[self._my_hostname]
502     except ValueError:
503       pass
504
505     # TODO: Check consistency across nodes
506
507     # Setup worker pool
508     self._wpool = _JobQueueWorkerPool(self)
509     try:
510       # We need to lock here because WorkerPool.AddTask() may start a job while
511       # we're still doing our work.
512       self.acquire()
513       try:
514         for job in self._GetJobsUnlocked(None):
515           # a failure in loading the job can cause 'None' to be returned
516           if job is None:
517             continue
518
519           status = job.CalcStatus()
520
521           if status in (constants.JOB_STATUS_QUEUED, ):
522             self._wpool.AddTask(job)
523
524           elif status in (constants.JOB_STATUS_RUNNING,
525                           constants.JOB_STATUS_WAITLOCK):
526             logging.warning("Unfinished job %s found: %s", job.id, job)
527             try:
528               for op in job.ops:
529                 op.status = constants.OP_STATUS_ERROR
530                 op.result = "Unclean master daemon shutdown"
531             finally:
532               self.UpdateJobUnlocked(job)
533       finally:
534         self.release()
535     except:
536       self._wpool.TerminateWorkers()
537       raise
538
539   @utils.LockedMethod
540   @_RequireOpenQueue
541   def AddNode(self, node):
542     """Register a new node with the queue.
543
544     @type node: L{objects.Node}
545     @param node: the node object to be added
546
547     """
548     node_name = node.name
549     assert node_name != self._my_hostname
550
551     # Clean queue directory on added node
552     rpc.RpcRunner.call_jobqueue_purge(node_name)
553
554     # Upload the whole queue excluding archived jobs
555     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
556
557     # Upload current serial file
558     files.append(constants.JOB_QUEUE_SERIAL_FILE)
559
560     for file_name in files:
561       # Read file content
562       fd = open(file_name, "r")
563       try:
564         content = fd.read()
565       finally:
566         fd.close()
567
568       result = rpc.RpcRunner.call_jobqueue_update([node_name],
569                                                   [node.primary_ip],
570                                                   file_name, content)
571       if not result[node_name]:
572         logging.error("Failed to upload %s to %s", file_name, node_name)
573
574     self._nodes[node_name] = node.primary_ip
575
576   @utils.LockedMethod
577   @_RequireOpenQueue
578   def RemoveNode(self, node_name):
579     """Callback called when removing nodes from the cluster.
580
581     @type node_name: str
582     @param node_name: the name of the node to remove
583
584     """
585     try:
586       # The queue is removed by the "leave node" RPC call.
587       del self._nodes[node_name]
588     except KeyError:
589       pass
590
591   def _CheckRpcResult(self, result, nodes, failmsg):
592     """Verifies the status of an RPC call.
593
594     Since we aim to keep consistency should this node (the current
595     master) fail, we will log errors if our rpc fail, and especially
596     log the case when more than half of the nodes failes.
597
598     @param result: the data as returned from the rpc call
599     @type nodes: list
600     @param nodes: the list of nodes we made the call to
601     @type failmsg: str
602     @param failmsg: the identifier to be used for logging
603
604     """
605     failed = []
606     success = []
607
608     for node in nodes:
609       if result[node]:
610         success.append(node)
611       else:
612         failed.append(node)
613
614     if failed:
615       logging.error("%s failed on %s", failmsg, ", ".join(failed))
616
617     # +1 for the master node
618     if (len(success) + 1) < len(failed):
619       # TODO: Handle failing nodes
620       logging.error("More than half of the nodes failed")
621
622   def _GetNodeIp(self):
623     """Helper for returning the node name/ip list.
624
625     @rtype: (list, list)
626     @return: a tuple of two lists, the first one with the node
627         names and the second one with the node addresses
628
629     """
630     name_list = self._nodes.keys()
631     addr_list = [self._nodes[name] for name in name_list]
632     return name_list, addr_list
633
634   def _WriteAndReplicateFileUnlocked(self, file_name, data):
635     """Writes a file locally and then replicates it to all nodes.
636
637     This function will replace the contents of a file on the local
638     node and then replicate it to all the other nodes we have.
639
640     @type file_name: str
641     @param file_name: the path of the file to be replicated
642     @type data: str
643     @param data: the new contents of the file
644
645     """
646     utils.WriteFile(file_name, data=data)
647
648     names, addrs = self._GetNodeIp()
649     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
650     self._CheckRpcResult(result, self._nodes,
651                          "Updating %s" % file_name)
652
653   def _RenameFileUnlocked(self, old, new):
654     """Renames a file locally and then replicate the change.
655
656     This function will rename a file in the local queue directory
657     and then replicate this rename to all the other nodes we have.
658
659     @type old: str
660     @param old: the current name of the file
661     @type new: str
662     @param new: the new name of the file
663
664     """
665     os.rename(old, new)
666
667     names, addrs = self._GetNodeIp()
668     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
669     self._CheckRpcResult(result, self._nodes,
670                          "Moving %s to %s" % (old, new))
671
672   def _FormatJobID(self, job_id):
673     """Convert a job ID to string format.
674
675     Currently this just does C{str(job_id)} after performing some
676     checks, but if we want to change the job id format this will
677     abstract this change.
678
679     @type job_id: int or long
680     @param job_id: the numeric job id
681     @rtype: str
682     @return: the formatted job id
683
684     """
685     if not isinstance(job_id, (int, long)):
686       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
687     if job_id < 0:
688       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
689
690     return str(job_id)
691
692   def _NewSerialUnlocked(self):
693     """Generates a new job identifier.
694
695     Job identifiers are unique during the lifetime of a cluster.
696
697     @rtype: str
698     @return: a string representing the job identifier.
699
700     """
701     # New number
702     serial = self._last_serial + 1
703
704     # Write to file
705     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
706                                         "%s\n" % serial)
707
708     # Keep it only if we were able to write the file
709     self._last_serial = serial
710
711     return self._FormatJobID(serial)
712
713   @staticmethod
714   def _GetJobPath(job_id):
715     """Returns the job file for a given job id.
716
717     @type job_id: str
718     @param job_id: the job identifier
719     @rtype: str
720     @return: the path to the job file
721
722     """
723     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
724
725   @staticmethod
726   def _GetArchivedJobPath(job_id):
727     """Returns the archived job file for a give job id.
728
729     @type job_id: str
730     @param job_id: the job identifier
731     @rtype: str
732     @return: the path to the archived job file
733
734     """
735     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
736
737   @classmethod
738   def _ExtractJobID(cls, name):
739     """Extract the job id from a filename.
740
741     @type name: str
742     @param name: the job filename
743     @rtype: job id or None
744     @return: the job id corresponding to the given filename,
745         or None if the filename does not represent a valid
746         job file
747
748     """
749     m = cls._RE_JOB_FILE.match(name)
750     if m:
751       return m.group(1)
752     else:
753       return None
754
755   def _GetJobIDsUnlocked(self, archived=False):
756     """Return all known job IDs.
757
758     If the parameter archived is True, archived jobs IDs will be
759     included. Currently this argument is unused.
760
761     The method only looks at disk because it's a requirement that all
762     jobs are present on disk (so in the _memcache we don't have any
763     extra IDs).
764
765     @rtype: list
766     @return: the list of job IDs
767
768     """
769     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
770     jlist = utils.NiceSort(jlist)
771     return jlist
772
773   def _ListJobFiles(self):
774     """Returns the list of current job files.
775
776     @rtype: list
777     @return: the list of job file names
778
779     """
780     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
781             if self._RE_JOB_FILE.match(name)]
782
783   def _LoadJobUnlocked(self, job_id):
784     """Loads a job from the disk or memory.
785
786     Given a job id, this will return the cached job object if
787     existing, or try to load the job from the disk. If loading from
788     disk, it will also add the job to the cache.
789
790     @param job_id: the job id
791     @rtype: L{_QueuedJob} or None
792     @return: either None or the job object
793
794     """
795     job = self._memcache.get(job_id, None)
796     if job:
797       logging.debug("Found job %s in memcache", job_id)
798       return job
799
800     filepath = self._GetJobPath(job_id)
801     logging.debug("Loading job from %s", filepath)
802     try:
803       fd = open(filepath, "r")
804     except IOError, err:
805       if err.errno in (errno.ENOENT, ):
806         return None
807       raise
808     try:
809       data = serializer.LoadJson(fd.read())
810     finally:
811       fd.close()
812
813     try:
814       job = _QueuedJob.Restore(self, data)
815     except Exception, err:
816       new_path = self._GetArchivedJobPath(job_id)
817       if filepath == new_path:
818         # job already archived (future case)
819         logging.exception("Can't parse job %s", job_id)
820       else:
821         # non-archived case
822         logging.exception("Can't parse job %s, will archive.", job_id)
823         self._RenameFileUnlocked(filepath, new_path)
824       return None
825
826     self._memcache[job_id] = job
827     logging.debug("Added job %s to the cache", job_id)
828     return job
829
830   def _GetJobsUnlocked(self, job_ids):
831     """Return a list of jobs based on their IDs.
832
833     @type job_ids: list
834     @param job_ids: either an empty list (meaning all jobs),
835         or a list of job IDs
836     @rtype: list
837     @return: the list of job objects
838
839     """
840     if not job_ids:
841       job_ids = self._GetJobIDsUnlocked()
842
843     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
844
845   @staticmethod
846   def _IsQueueMarkedDrain():
847     """Check if the queue is marked from drain.
848
849     This currently uses the queue drain file, which makes it a
850     per-node flag. In the future this can be moved to the config file.
851
852     @rtype: boolean
853     @return: True of the job queue is marked for draining
854
855     """
856     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
857
858   @staticmethod
859   def SetDrainFlag(drain_flag):
860     """Sets the drain flag for the queue.
861
862     This is similar to the function L{backend.JobQueueSetDrainFlag},
863     and in the future we might merge them.
864
865     @type drain_flag: boolean
866     @param drain_flag: wheter to set or unset the drain flag
867
868     """
869     if drain_flag:
870       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
871     else:
872       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
873     return True
874
875   @utils.LockedMethod
876   @_RequireOpenQueue
877   def SubmitJob(self, ops):
878     """Create and store a new job.
879
880     This enters the job into our job queue and also puts it on the new
881     queue, in order for it to be picked up by the queue processors.
882
883     @type ops: list
884     @param ops: The list of OpCodes that will become the new job.
885     @rtype: job ID
886     @return: the job ID of the newly created job
887     @raise errors.JobQueueDrainError: if the job is marked for draining
888
889     """
890     if self._IsQueueMarkedDrain():
891       raise errors.JobQueueDrainError()
892     # Get job identifier
893     job_id = self._NewSerialUnlocked()
894     job = _QueuedJob(self, job_id, ops)
895
896     # Write to disk
897     self.UpdateJobUnlocked(job)
898
899     logging.debug("Adding new job %s to the cache", job_id)
900     self._memcache[job_id] = job
901
902     # Add to worker pool
903     self._wpool.AddTask(job)
904
905     return job.id
906
907   @_RequireOpenQueue
908   def UpdateJobUnlocked(self, job):
909     """Update a job's on disk storage.
910
911     After a job has been modified, this function needs to be called in
912     order to write the changes to disk and replicate them to the other
913     nodes.
914
915     @type job: L{_QueuedJob}
916     @param job: the changed job
917
918     """
919     filename = self._GetJobPath(job.id)
920     data = serializer.DumpJson(job.Serialize(), indent=False)
921     logging.debug("Writing job %s to %s", job.id, filename)
922     self._WriteAndReplicateFileUnlocked(filename, data)
923
924     # Notify waiters about potential changes
925     job.change.notifyAll()
926
927   @utils.LockedMethod
928   @_RequireOpenQueue
929   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
930                         timeout):
931     """Waits for changes in a job.
932
933     @type job_id: string
934     @param job_id: Job identifier
935     @type fields: list of strings
936     @param fields: Which fields to check for changes
937     @type prev_job_info: list or None
938     @param prev_job_info: Last job information returned
939     @type prev_log_serial: int
940     @param prev_log_serial: Last job message serial number
941     @type timeout: float
942     @param timeout: maximum time to wait
943     @rtype: tuple (job info, log entries)
944     @return: a tuple of the job information as required via
945         the fields parameter, and the log entries as a list
946
947         if the job has not changed and the timeout has expired,
948         we instead return a special value,
949         L{constants.JOB_NOTCHANGED}, which should be interpreted
950         as such by the clients
951
952     """
953     logging.debug("Waiting for changes in job %s", job_id)
954     end_time = time.time() + timeout
955     while True:
956       delta_time = end_time - time.time()
957       if delta_time < 0:
958         return constants.JOB_NOTCHANGED
959
960       job = self._LoadJobUnlocked(job_id)
961       if not job:
962         logging.debug("Job %s not found", job_id)
963         break
964
965       status = job.CalcStatus()
966       job_info = self._GetJobInfoUnlocked(job, fields)
967       log_entries = job.GetLogEntries(prev_log_serial)
968
969       # Serializing and deserializing data can cause type changes (e.g. from
970       # tuple to list) or precision loss. We're doing it here so that we get
971       # the same modifications as the data received from the client. Without
972       # this, the comparison afterwards might fail without the data being
973       # significantly different.
974       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
975       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
976
977       if status not in (constants.JOB_STATUS_QUEUED,
978                         constants.JOB_STATUS_RUNNING,
979                         constants.JOB_STATUS_WAITLOCK):
980         # Don't even try to wait if the job is no longer running, there will be
981         # no changes.
982         break
983
984       if (prev_job_info != job_info or
985           (log_entries and prev_log_serial != log_entries[0][0])):
986         break
987
988       logging.debug("Waiting again")
989
990       # Release the queue lock while waiting
991       job.change.wait(delta_time)
992
993     logging.debug("Job %s changed", job_id)
994
995     return (job_info, log_entries)
996
997   @utils.LockedMethod
998   @_RequireOpenQueue
999   def CancelJob(self, job_id):
1000     """Cancels a job.
1001
1002     This will only succeed if the job has not started yet.
1003
1004     @type job_id: string
1005     @param job_id: job ID of job to be cancelled.
1006
1007     """
1008     logging.debug("Cancelling job %s", job_id)
1009
1010     job = self._LoadJobUnlocked(job_id)
1011     if not job:
1012       logging.debug("Job %s not found", job_id)
1013       return
1014
1015     if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
1016       logging.debug("Job %s is no longer in the queue", job.id)
1017       return
1018
1019     try:
1020       for op in job.ops:
1021         op.status = constants.OP_STATUS_ERROR
1022         op.result = "Job cancelled by request"
1023     finally:
1024       self.UpdateJobUnlocked(job)
1025
1026   @_RequireOpenQueue
1027   def _ArchiveJobUnlocked(self, job_id):
1028     """Archives a job.
1029
1030     @type job_id: string
1031     @param job_id: Job ID of job to be archived.
1032
1033     """
1034     logging.info("Archiving job %s", job_id)
1035
1036     job = self._LoadJobUnlocked(job_id)
1037     if not job:
1038       logging.debug("Job %s not found", job_id)
1039       return
1040
1041     if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1042                                 constants.JOB_STATUS_SUCCESS,
1043                                 constants.JOB_STATUS_ERROR):
1044       logging.debug("Job %s is not yet done", job.id)
1045       return
1046
1047     old = self._GetJobPath(job.id)
1048     new = self._GetArchivedJobPath(job.id)
1049
1050     self._RenameFileUnlocked(old, new)
1051
1052     logging.debug("Successfully archived job %s", job.id)
1053
1054   @utils.LockedMethod
1055   @_RequireOpenQueue
1056   def ArchiveJob(self, job_id):
1057     """Archives a job.
1058
1059     This is just a wrapper over L{_ArchiveJobUnlocked}.
1060
1061     @type job_id: string
1062     @param job_id: Job ID of job to be archived.
1063
1064     """
1065     return self._ArchiveJobUnlocked(job_id)
1066
1067   @utils.LockedMethod
1068   @_RequireOpenQueue
1069   def AutoArchiveJobs(self, age):
1070     """Archives all jobs based on age.
1071
1072     The method will archive all jobs which are older than the age
1073     parameter. For jobs that don't have an end timestamp, the start
1074     timestamp will be considered. The special '-1' age will cause
1075     archival of all jobs (that are not running or queued).
1076
1077     @type age: int
1078     @param age: the minimum age in seconds
1079
1080     """
1081     logging.info("Archiving jobs with age more than %s seconds", age)
1082
1083     now = time.time()
1084     for jid in self._GetJobIDsUnlocked(archived=False):
1085       job = self._LoadJobUnlocked(jid)
1086       if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1087                                   constants.OP_STATUS_ERROR,
1088                                   constants.OP_STATUS_CANCELED):
1089         continue
1090       if job.end_timestamp is None:
1091         if job.start_timestamp is None:
1092           job_age = job.received_timestamp
1093         else:
1094           job_age = job.start_timestamp
1095       else:
1096         job_age = job.end_timestamp
1097
1098       if age == -1 or now - job_age[0] > age:
1099         self._ArchiveJobUnlocked(jid)
1100
1101   def _GetJobInfoUnlocked(self, job, fields):
1102     """Returns information about a job.
1103
1104     @type job: L{_QueuedJob}
1105     @param job: the job which we query
1106     @type fields: list
1107     @param fields: names of fields to return
1108     @rtype: list
1109     @return: list with one element for each field
1110     @raise errors.OpExecError: when an invalid field
1111         has been passed
1112
1113     """
1114     row = []
1115     for fname in fields:
1116       if fname == "id":
1117         row.append(job.id)
1118       elif fname == "status":
1119         row.append(job.CalcStatus())
1120       elif fname == "ops":
1121         row.append([op.input.__getstate__() for op in job.ops])
1122       elif fname == "opresult":
1123         row.append([op.result for op in job.ops])
1124       elif fname == "opstatus":
1125         row.append([op.status for op in job.ops])
1126       elif fname == "oplog":
1127         row.append([op.log for op in job.ops])
1128       elif fname == "opstart":
1129         row.append([op.start_timestamp for op in job.ops])
1130       elif fname == "opend":
1131         row.append([op.end_timestamp for op in job.ops])
1132       elif fname == "received_ts":
1133         row.append(job.received_timestamp)
1134       elif fname == "start_ts":
1135         row.append(job.start_timestamp)
1136       elif fname == "end_ts":
1137         row.append(job.end_timestamp)
1138       elif fname == "summary":
1139         row.append([op.input.Summary() for op in job.ops])
1140       else:
1141         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1142     return row
1143
1144   @utils.LockedMethod
1145   @_RequireOpenQueue
1146   def QueryJobs(self, job_ids, fields):
1147     """Returns a list of jobs in queue.
1148
1149     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1150     processing for each job.
1151
1152     @type job_ids: list
1153     @param job_ids: sequence of job identifiers or None for all
1154     @type fields: list
1155     @param fields: names of fields to return
1156     @rtype: list
1157     @return: list one element per job, each element being list with
1158         the requested fields
1159
1160     """
1161     jobs = []
1162
1163     for job in self._GetJobsUnlocked(job_ids):
1164       if job is None:
1165         jobs.append(None)
1166       else:
1167         jobs.append(self._GetJobInfoUnlocked(job, fields))
1168
1169     return jobs
1170
1171   @utils.LockedMethod
1172   @_RequireOpenQueue
1173   def Shutdown(self):
1174     """Stops the job queue.
1175
1176     This shutdowns all the worker threads an closes the queue.
1177
1178     """
1179     self._wpool.TerminateWorkers()
1180
1181     self._queue_lock.Close()
1182     self._queue_lock = None