Optimise multi-job submit
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   __slots__ = ["input", "status", "result", "log",
84                "start_timestamp", "end_timestamp",
85                "__weakref__"]
86
87   def __init__(self, op):
88     """Constructor for the _QuededOpCode.
89
90     @type op: L{opcodes.OpCode}
91     @param op: the opcode we encapsulate
92
93     """
94     self.input = op
95     self.status = constants.OP_STATUS_QUEUED
96     self.result = None
97     self.log = []
98     self.start_timestamp = None
99     self.end_timestamp = None
100
101   @classmethod
102   def Restore(cls, state):
103     """Restore the _QueuedOpCode from the serialized form.
104
105     @type state: dict
106     @param state: the serialized state
107     @rtype: _QueuedOpCode
108     @return: a new _QueuedOpCode instance
109
110     """
111     obj = _QueuedOpCode.__new__(cls)
112     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113     obj.status = state["status"]
114     obj.result = state["result"]
115     obj.log = state["log"]
116     obj.start_timestamp = state.get("start_timestamp", None)
117     obj.end_timestamp = state.get("end_timestamp", None)
118     return obj
119
120   def Serialize(self):
121     """Serializes this _QueuedOpCode.
122
123     @rtype: dict
124     @return: the dictionary holding the serialized state
125
126     """
127     return {
128       "input": self.input.__getstate__(),
129       "status": self.status,
130       "result": self.result,
131       "log": self.log,
132       "start_timestamp": self.start_timestamp,
133       "end_timestamp": self.end_timestamp,
134       }
135
136
137 class _QueuedJob(object):
138   """In-memory job representation.
139
140   This is what we use to track the user-submitted jobs. Locking must
141   be taken care of by users of this class.
142
143   @type queue: L{JobQueue}
144   @ivar queue: the parent queue
145   @ivar id: the job ID
146   @type ops: list
147   @ivar ops: the list of _QueuedOpCode that constitute the job
148   @type run_op_index: int
149   @ivar run_op_index: the currently executing opcode, or -1 if
150       we didn't yet start executing
151   @type log_serial: int
152   @ivar log_serial: holds the index for the next log entry
153   @ivar received_timestamp: the timestamp for when the job was received
154   @ivar start_timestmap: the timestamp for start of execution
155   @ivar end_timestamp: the timestamp for end of execution
156   @ivar change: a Condition variable we use for waiting for job changes
157
158   """
159   __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160                "received_timestamp", "start_timestamp", "end_timestamp",
161                "change",
162                "__weakref__"]
163
164   def __init__(self, queue, job_id, ops):
165     """Constructor for the _QueuedJob.
166
167     @type queue: L{JobQueue}
168     @param queue: our parent queue
169     @type job_id: job_id
170     @param job_id: our job id
171     @type ops: list
172     @param ops: the list of opcodes we hold, which will be encapsulated
173         in _QueuedOpCodes
174
175     """
176     if not ops:
177       # TODO: use a better exception
178       raise Exception("No opcodes")
179
180     self.queue = queue
181     self.id = job_id
182     self.ops = [_QueuedOpCode(op) for op in ops]
183     self.run_op_index = -1
184     self.log_serial = 0
185     self.received_timestamp = TimeStampNow()
186     self.start_timestamp = None
187     self.end_timestamp = None
188
189     # Condition to wait for changes
190     self.change = threading.Condition(self.queue._lock)
191
192   @classmethod
193   def Restore(cls, queue, state):
194     """Restore a _QueuedJob from serialized state:
195
196     @type queue: L{JobQueue}
197     @param queue: to which queue the restored job belongs
198     @type state: dict
199     @param state: the serialized state
200     @rtype: _JobQueue
201     @return: the restored _JobQueue instance
202
203     """
204     obj = _QueuedJob.__new__(cls)
205     obj.queue = queue
206     obj.id = state["id"]
207     obj.run_op_index = state["run_op_index"]
208     obj.received_timestamp = state.get("received_timestamp", None)
209     obj.start_timestamp = state.get("start_timestamp", None)
210     obj.end_timestamp = state.get("end_timestamp", None)
211
212     obj.ops = []
213     obj.log_serial = 0
214     for op_state in state["ops"]:
215       op = _QueuedOpCode.Restore(op_state)
216       for log_entry in op.log:
217         obj.log_serial = max(obj.log_serial, log_entry[0])
218       obj.ops.append(op)
219
220     # Condition to wait for changes
221     obj.change = threading.Condition(obj.queue._lock)
222
223     return obj
224
225   def Serialize(self):
226     """Serialize the _JobQueue instance.
227
228     @rtype: dict
229     @return: the serialized state
230
231     """
232     return {
233       "id": self.id,
234       "ops": [op.Serialize() for op in self.ops],
235       "run_op_index": self.run_op_index,
236       "start_timestamp": self.start_timestamp,
237       "end_timestamp": self.end_timestamp,
238       "received_timestamp": self.received_timestamp,
239       }
240
241   def CalcStatus(self):
242     """Compute the status of this job.
243
244     This function iterates over all the _QueuedOpCodes in the job and
245     based on their status, computes the job status.
246
247     The algorithm is:
248       - if we find a cancelled, or finished with error, the job
249         status will be the same
250       - otherwise, the last opcode with the status one of:
251           - waitlock
252           - canceling
253           - running
254
255         will determine the job status
256
257       - otherwise, it means either all opcodes are queued, or success,
258         and the job status will be the same
259
260     @return: the job status
261
262     """
263     status = constants.JOB_STATUS_QUEUED
264
265     all_success = True
266     for op in self.ops:
267       if op.status == constants.OP_STATUS_SUCCESS:
268         continue
269
270       all_success = False
271
272       if op.status == constants.OP_STATUS_QUEUED:
273         pass
274       elif op.status == constants.OP_STATUS_WAITLOCK:
275         status = constants.JOB_STATUS_WAITLOCK
276       elif op.status == constants.OP_STATUS_RUNNING:
277         status = constants.JOB_STATUS_RUNNING
278       elif op.status == constants.OP_STATUS_CANCELING:
279         status = constants.JOB_STATUS_CANCELING
280         break
281       elif op.status == constants.OP_STATUS_ERROR:
282         status = constants.JOB_STATUS_ERROR
283         # The whole job fails if one opcode failed
284         break
285       elif op.status == constants.OP_STATUS_CANCELED:
286         status = constants.OP_STATUS_CANCELED
287         break
288
289     if all_success:
290       status = constants.JOB_STATUS_SUCCESS
291
292     return status
293
294   def GetLogEntries(self, newer_than):
295     """Selectively returns the log entries.
296
297     @type newer_than: None or int
298     @param newer_than: if this is None, return all log entries,
299         otherwise return only the log entries with serial higher
300         than this value
301     @rtype: list
302     @return: the list of the log entries selected
303
304     """
305     if newer_than is None:
306       serial = -1
307     else:
308       serial = newer_than
309
310     entries = []
311     for op in self.ops:
312       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313
314     return entries
315
316   def MarkUnfinishedOps(self, status, result):
317     """Mark unfinished opcodes with a given status and result.
318
319     This is an utility function for marking all running or waiting to
320     be run opcodes with a given status. Opcodes which are already
321     finalised are not changed.
322
323     @param status: a given opcode status
324     @param result: the opcode result
325
326     """
327     not_marked = True
328     for op in self.ops:
329       if op.status in constants.OPS_FINALIZED:
330         assert not_marked, "Finalized opcodes found after non-finalized ones"
331         continue
332       op.status = status
333       op.result = result
334       not_marked = False
335
336
337 class _JobQueueWorker(workerpool.BaseWorker):
338   """The actual job workers.
339
340   """
341   def _NotifyStart(self):
342     """Mark the opcode as running, not lock-waiting.
343
344     This is called from the mcpu code as a notifier function, when the
345     LU is finally about to start the Exec() method. Of course, to have
346     end-user visible results, the opcode must be initially (before
347     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
348
349     """
350     assert self.queue, "Queue attribute is missing"
351     assert self.opcode, "Opcode attribute is missing"
352
353     self.queue.acquire()
354     try:
355       assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
356                                     constants.OP_STATUS_CANCELING)
357
358       # Cancel here if we were asked to
359       if self.opcode.status == constants.OP_STATUS_CANCELING:
360         raise CancelJob()
361
362       self.opcode.status = constants.OP_STATUS_RUNNING
363     finally:
364       self.queue.release()
365
366   def RunTask(self, job):
367     """Job executor.
368
369     This functions processes a job. It is closely tied to the _QueuedJob and
370     _QueuedOpCode classes.
371
372     @type job: L{_QueuedJob}
373     @param job: the job to be processed
374
375     """
376     logging.info("Worker %s processing job %s",
377                   self.worker_id, job.id)
378     proc = mcpu.Processor(self.pool.queue.context)
379     self.queue = queue = job.queue
380     try:
381       try:
382         count = len(job.ops)
383         for idx, op in enumerate(job.ops):
384           op_summary = op.input.Summary()
385           if op.status == constants.OP_STATUS_SUCCESS:
386             # this is a job that was partially completed before master
387             # daemon shutdown, so it can be expected that some opcodes
388             # are already completed successfully (if any did error
389             # out, then the whole job should have been aborted and not
390             # resubmitted for processing)
391             logging.info("Op %s/%s: opcode %s already processed, skipping",
392                          idx + 1, count, op_summary)
393             continue
394           try:
395             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
396                          op_summary)
397
398             queue.acquire()
399             try:
400               if op.status == constants.OP_STATUS_CANCELED:
401                 raise CancelJob()
402               assert op.status == constants.OP_STATUS_QUEUED
403               job.run_op_index = idx
404               op.status = constants.OP_STATUS_WAITLOCK
405               op.result = None
406               op.start_timestamp = TimeStampNow()
407               if idx == 0: # first opcode
408                 job.start_timestamp = op.start_timestamp
409               queue.UpdateJobUnlocked(job)
410
411               input_opcode = op.input
412             finally:
413               queue.release()
414
415             def _Log(*args):
416               """Append a log entry.
417
418               """
419               assert len(args) < 3
420
421               if len(args) == 1:
422                 log_type = constants.ELOG_MESSAGE
423                 log_msg = args[0]
424               else:
425                 log_type, log_msg = args
426
427               # The time is split to make serialization easier and not lose
428               # precision.
429               timestamp = utils.SplitTime(time.time())
430
431               queue.acquire()
432               try:
433                 job.log_serial += 1
434                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
435
436                 job.change.notifyAll()
437               finally:
438                 queue.release()
439
440             # Make sure not to hold lock while _Log is called
441             self.opcode = op
442             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
443
444             queue.acquire()
445             try:
446               op.status = constants.OP_STATUS_SUCCESS
447               op.result = result
448               op.end_timestamp = TimeStampNow()
449               queue.UpdateJobUnlocked(job)
450             finally:
451               queue.release()
452
453             logging.info("Op %s/%s: Successfully finished opcode %s",
454                          idx + 1, count, op_summary)
455           except CancelJob:
456             # Will be handled further up
457             raise
458           except Exception, err:
459             queue.acquire()
460             try:
461               try:
462                 op.status = constants.OP_STATUS_ERROR
463                 op.result = str(err)
464                 op.end_timestamp = TimeStampNow()
465                 logging.info("Op %s/%s: Error in opcode %s: %s",
466                              idx + 1, count, op_summary, err)
467               finally:
468                 queue.UpdateJobUnlocked(job)
469             finally:
470               queue.release()
471             raise
472
473       except CancelJob:
474         queue.acquire()
475         try:
476           queue.CancelJobUnlocked(job)
477         finally:
478           queue.release()
479       except errors.GenericError, err:
480         logging.exception("Ganeti exception")
481       except:
482         logging.exception("Unhandled exception")
483     finally:
484       queue.acquire()
485       try:
486         try:
487           job.run_op_index = -1
488           job.end_timestamp = TimeStampNow()
489           queue.UpdateJobUnlocked(job)
490         finally:
491           job_id = job.id
492           status = job.CalcStatus()
493       finally:
494         queue.release()
495       logging.info("Worker %s finished job %s, status = %s",
496                    self.worker_id, job_id, status)
497
498
499 class _JobQueueWorkerPool(workerpool.WorkerPool):
500   """Simple class implementing a job-processing workerpool.
501
502   """
503   def __init__(self, queue):
504     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
505                                               _JobQueueWorker)
506     self.queue = queue
507
508
509 class JobQueue(object):
510   """Queue used to manage the jobs.
511
512   @cvar _RE_JOB_FILE: regex matching the valid job file names
513
514   """
515   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
516
517   def _RequireOpenQueue(fn):
518     """Decorator for "public" functions.
519
520     This function should be used for all 'public' functions. That is,
521     functions usually called from other classes.
522
523     @warning: Use this decorator only after utils.LockedMethod!
524
525     Example::
526       @utils.LockedMethod
527       @_RequireOpenQueue
528       def Example(self):
529         pass
530
531     """
532     def wrapper(self, *args, **kwargs):
533       assert self._queue_lock is not None, "Queue should be open"
534       return fn(self, *args, **kwargs)
535     return wrapper
536
537   def __init__(self, context):
538     """Constructor for JobQueue.
539
540     The constructor will initialize the job queue object and then
541     start loading the current jobs from disk, either for starting them
542     (if they were queue) or for aborting them (if they were already
543     running).
544
545     @type context: GanetiContext
546     @param context: the context object for access to the configuration
547         data and other ganeti objects
548
549     """
550     self.context = context
551     self._memcache = weakref.WeakValueDictionary()
552     self._my_hostname = utils.HostInfo().name
553
554     # Locking
555     self._lock = threading.Lock()
556     self.acquire = self._lock.acquire
557     self.release = self._lock.release
558
559     # Initialize
560     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
561
562     # Read serial file
563     self._last_serial = jstore.ReadSerial()
564     assert self._last_serial is not None, ("Serial file was modified between"
565                                            " check in jstore and here")
566
567     # Get initial list of nodes
568     self._nodes = dict((n.name, n.primary_ip)
569                        for n in self.context.cfg.GetAllNodesInfo().values()
570                        if n.master_candidate)
571
572     # Remove master node
573     try:
574       del self._nodes[self._my_hostname]
575     except KeyError:
576       pass
577
578     # TODO: Check consistency across nodes
579
580     # Setup worker pool
581     self._wpool = _JobQueueWorkerPool(self)
582     try:
583       # We need to lock here because WorkerPool.AddTask() may start a job while
584       # we're still doing our work.
585       self.acquire()
586       try:
587         logging.info("Inspecting job queue")
588
589         all_job_ids = self._GetJobIDsUnlocked()
590         jobs_count = len(all_job_ids)
591         lastinfo = time.time()
592         for idx, job_id in enumerate(all_job_ids):
593           # Give an update every 1000 jobs or 10 seconds
594           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
595               idx == (jobs_count - 1)):
596             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
597                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
598             lastinfo = time.time()
599
600           job = self._LoadJobUnlocked(job_id)
601
602           # a failure in loading the job can cause 'None' to be returned
603           if job is None:
604             continue
605
606           status = job.CalcStatus()
607
608           if status in (constants.JOB_STATUS_QUEUED, ):
609             self._wpool.AddTask(job)
610
611           elif status in (constants.JOB_STATUS_RUNNING,
612                           constants.JOB_STATUS_WAITLOCK,
613                           constants.JOB_STATUS_CANCELING):
614             logging.warning("Unfinished job %s found: %s", job.id, job)
615             try:
616               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
617                                     "Unclean master daemon shutdown")
618             finally:
619               self.UpdateJobUnlocked(job)
620
621         logging.info("Job queue inspection finished")
622       finally:
623         self.release()
624     except:
625       self._wpool.TerminateWorkers()
626       raise
627
628   @utils.LockedMethod
629   @_RequireOpenQueue
630   def AddNode(self, node):
631     """Register a new node with the queue.
632
633     @type node: L{objects.Node}
634     @param node: the node object to be added
635
636     """
637     node_name = node.name
638     assert node_name != self._my_hostname
639
640     # Clean queue directory on added node
641     rpc.RpcRunner.call_jobqueue_purge(node_name)
642
643     if not node.master_candidate:
644       # remove if existing, ignoring errors
645       self._nodes.pop(node_name, None)
646       # and skip the replication of the job ids
647       return
648
649     # Upload the whole queue excluding archived jobs
650     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
651
652     # Upload current serial file
653     files.append(constants.JOB_QUEUE_SERIAL_FILE)
654
655     for file_name in files:
656       # Read file content
657       fd = open(file_name, "r")
658       try:
659         content = fd.read()
660       finally:
661         fd.close()
662
663       result = rpc.RpcRunner.call_jobqueue_update([node_name],
664                                                   [node.primary_ip],
665                                                   file_name, content)
666       if not result[node_name]:
667         logging.error("Failed to upload %s to %s", file_name, node_name)
668
669     self._nodes[node_name] = node.primary_ip
670
671   @utils.LockedMethod
672   @_RequireOpenQueue
673   def RemoveNode(self, node_name):
674     """Callback called when removing nodes from the cluster.
675
676     @type node_name: str
677     @param node_name: the name of the node to remove
678
679     """
680     try:
681       # The queue is removed by the "leave node" RPC call.
682       del self._nodes[node_name]
683     except KeyError:
684       pass
685
686   def _CheckRpcResult(self, result, nodes, failmsg):
687     """Verifies the status of an RPC call.
688
689     Since we aim to keep consistency should this node (the current
690     master) fail, we will log errors if our rpc fail, and especially
691     log the case when more than half of the nodes fails.
692
693     @param result: the data as returned from the rpc call
694     @type nodes: list
695     @param nodes: the list of nodes we made the call to
696     @type failmsg: str
697     @param failmsg: the identifier to be used for logging
698
699     """
700     failed = []
701     success = []
702
703     for node in nodes:
704       if result[node]:
705         success.append(node)
706       else:
707         failed.append(node)
708
709     if failed:
710       logging.error("%s failed on %s", failmsg, ", ".join(failed))
711
712     # +1 for the master node
713     if (len(success) + 1) < len(failed):
714       # TODO: Handle failing nodes
715       logging.error("More than half of the nodes failed")
716
717   def _GetNodeIp(self):
718     """Helper for returning the node name/ip list.
719
720     @rtype: (list, list)
721     @return: a tuple of two lists, the first one with the node
722         names and the second one with the node addresses
723
724     """
725     name_list = self._nodes.keys()
726     addr_list = [self._nodes[name] for name in name_list]
727     return name_list, addr_list
728
729   def _WriteAndReplicateFileUnlocked(self, file_name, data):
730     """Writes a file locally and then replicates it to all nodes.
731
732     This function will replace the contents of a file on the local
733     node and then replicate it to all the other nodes we have.
734
735     @type file_name: str
736     @param file_name: the path of the file to be replicated
737     @type data: str
738     @param data: the new contents of the file
739
740     """
741     utils.WriteFile(file_name, data=data)
742
743     names, addrs = self._GetNodeIp()
744     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
745     self._CheckRpcResult(result, self._nodes,
746                          "Updating %s" % file_name)
747
748   def _RenameFilesUnlocked(self, rename):
749     """Renames a file locally and then replicate the change.
750
751     This function will rename a file in the local queue directory
752     and then replicate this rename to all the other nodes we have.
753
754     @type rename: list of (old, new)
755     @param rename: List containing tuples mapping old to new names
756
757     """
758     # Rename them locally
759     for old, new in rename:
760       utils.RenameFile(old, new, mkdir=True)
761
762     # ... and on all nodes
763     names, addrs = self._GetNodeIp()
764     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
765     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
766
767   def _FormatJobID(self, job_id):
768     """Convert a job ID to string format.
769
770     Currently this just does C{str(job_id)} after performing some
771     checks, but if we want to change the job id format this will
772     abstract this change.
773
774     @type job_id: int or long
775     @param job_id: the numeric job id
776     @rtype: str
777     @return: the formatted job id
778
779     """
780     if not isinstance(job_id, (int, long)):
781       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
782     if job_id < 0:
783       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
784
785     return str(job_id)
786
787   @classmethod
788   def _GetArchiveDirectory(cls, job_id):
789     """Returns the archive directory for a job.
790
791     @type job_id: str
792     @param job_id: Job identifier
793     @rtype: str
794     @return: Directory name
795
796     """
797     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
798
799   def _NewSerialsUnlocked(self, count):
800     """Generates a new job identifier.
801
802     Job identifiers are unique during the lifetime of a cluster.
803
804     @type count: integer
805     @param count: how many serials to return
806     @rtype: str
807     @return: a string representing the job identifier.
808
809     """
810     assert count > 0
811     # New number
812     serial = self._last_serial + count
813
814     # Write to file
815     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
816                                         "%s\n" % serial)
817
818     result = [self._FormatJobID(v)
819               for v in range(self._last_serial, serial + 1)]
820     # Keep it only if we were able to write the file
821     self._last_serial = serial
822
823     return result
824
825   @staticmethod
826   def _GetJobPath(job_id):
827     """Returns the job file for a given job id.
828
829     @type job_id: str
830     @param job_id: the job identifier
831     @rtype: str
832     @return: the path to the job file
833
834     """
835     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
836
837   @classmethod
838   def _GetArchivedJobPath(cls, job_id):
839     """Returns the archived job file for a give job id.
840
841     @type job_id: str
842     @param job_id: the job identifier
843     @rtype: str
844     @return: the path to the archived job file
845
846     """
847     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
848     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
849
850   @classmethod
851   def _ExtractJobID(cls, name):
852     """Extract the job id from a filename.
853
854     @type name: str
855     @param name: the job filename
856     @rtype: job id or None
857     @return: the job id corresponding to the given filename,
858         or None if the filename does not represent a valid
859         job file
860
861     """
862     m = cls._RE_JOB_FILE.match(name)
863     if m:
864       return m.group(1)
865     else:
866       return None
867
868   def _GetJobIDsUnlocked(self, archived=False):
869     """Return all known job IDs.
870
871     If the parameter archived is True, archived jobs IDs will be
872     included. Currently this argument is unused.
873
874     The method only looks at disk because it's a requirement that all
875     jobs are present on disk (so in the _memcache we don't have any
876     extra IDs).
877
878     @rtype: list
879     @return: the list of job IDs
880
881     """
882     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
883     jlist = utils.NiceSort(jlist)
884     return jlist
885
886   def _ListJobFiles(self):
887     """Returns the list of current job files.
888
889     @rtype: list
890     @return: the list of job file names
891
892     """
893     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
894             if self._RE_JOB_FILE.match(name)]
895
896   def _LoadJobUnlocked(self, job_id):
897     """Loads a job from the disk or memory.
898
899     Given a job id, this will return the cached job object if
900     existing, or try to load the job from the disk. If loading from
901     disk, it will also add the job to the cache.
902
903     @param job_id: the job id
904     @rtype: L{_QueuedJob} or None
905     @return: either None or the job object
906
907     """
908     job = self._memcache.get(job_id, None)
909     if job:
910       logging.debug("Found job %s in memcache", job_id)
911       return job
912
913     filepath = self._GetJobPath(job_id)
914     logging.debug("Loading job from %s", filepath)
915     try:
916       fd = open(filepath, "r")
917     except IOError, err:
918       if err.errno in (errno.ENOENT, ):
919         return None
920       raise
921     try:
922       data = serializer.LoadJson(fd.read())
923     finally:
924       fd.close()
925
926     try:
927       job = _QueuedJob.Restore(self, data)
928     except Exception, err:
929       new_path = self._GetArchivedJobPath(job_id)
930       if filepath == new_path:
931         # job already archived (future case)
932         logging.exception("Can't parse job %s", job_id)
933       else:
934         # non-archived case
935         logging.exception("Can't parse job %s, will archive.", job_id)
936         self._RenameFilesUnlocked([(filepath, new_path)])
937       return None
938
939     self._memcache[job_id] = job
940     logging.debug("Added job %s to the cache", job_id)
941     return job
942
943   def _GetJobsUnlocked(self, job_ids):
944     """Return a list of jobs based on their IDs.
945
946     @type job_ids: list
947     @param job_ids: either an empty list (meaning all jobs),
948         or a list of job IDs
949     @rtype: list
950     @return: the list of job objects
951
952     """
953     if not job_ids:
954       job_ids = self._GetJobIDsUnlocked()
955
956     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
957
958   @staticmethod
959   def _IsQueueMarkedDrain():
960     """Check if the queue is marked from drain.
961
962     This currently uses the queue drain file, which makes it a
963     per-node flag. In the future this can be moved to the config file.
964
965     @rtype: boolean
966     @return: True of the job queue is marked for draining
967
968     """
969     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
970
971   @staticmethod
972   def SetDrainFlag(drain_flag):
973     """Sets the drain flag for the queue.
974
975     This is similar to the function L{backend.JobQueueSetDrainFlag},
976     and in the future we might merge them.
977
978     @type drain_flag: boolean
979     @param drain_flag: Whether to set or unset the drain flag
980
981     """
982     if drain_flag:
983       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
984     else:
985       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
986     return True
987
988   @_RequireOpenQueue
989   def _SubmitJobUnlocked(self, job_id, ops):
990     """Create and store a new job.
991
992     This enters the job into our job queue and also puts it on the new
993     queue, in order for it to be picked up by the queue processors.
994
995     @type job_id: job ID
996     @param jod_id: the job ID for the new job
997     @type ops: list
998     @param ops: The list of OpCodes that will become the new job.
999     @rtype: job ID
1000     @return: the job ID of the newly created job
1001     @raise errors.JobQueueDrainError: if the job is marked for draining
1002
1003     """
1004     if self._IsQueueMarkedDrain():
1005       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1006
1007     # Check job queue size
1008     size = len(self._ListJobFiles())
1009     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1010       # TODO: Autoarchive jobs. Make sure it's not done on every job
1011       # submission, though.
1012       #size = ...
1013       pass
1014
1015     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1016       raise errors.JobQueueFull()
1017
1018     job = _QueuedJob(self, job_id, ops)
1019
1020     # Write to disk
1021     self.UpdateJobUnlocked(job)
1022
1023     logging.debug("Adding new job %s to the cache", job_id)
1024     self._memcache[job_id] = job
1025
1026     # Add to worker pool
1027     self._wpool.AddTask(job)
1028
1029     return job.id
1030
1031   @utils.LockedMethod
1032   @_RequireOpenQueue
1033   def SubmitJob(self, ops):
1034     """Create and store a new job.
1035
1036     @see: L{_SubmitJobUnlocked}
1037
1038     """
1039     job_id = self._NewSerialsUnlocked(1)[0]
1040     return self._SubmitJobUnlocked(job_id, ops)
1041
1042   @utils.LockedMethod
1043   @_RequireOpenQueue
1044   def SubmitManyJobs(self, jobs):
1045     """Create and store multiple jobs.
1046
1047     @see: L{_SubmitJobUnlocked}
1048
1049     """
1050     results = []
1051     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1052     for job_id, ops in zip(all_job_ids, jobs):
1053       try:
1054         data = self._SubmitJobUnlocked(job_id, ops)
1055         status = True
1056       except errors.GenericError, err:
1057         data = str(err)
1058         status = False
1059       results.append((status, data))
1060
1061     return results
1062
1063
1064   @_RequireOpenQueue
1065   def UpdateJobUnlocked(self, job):
1066     """Update a job's on disk storage.
1067
1068     After a job has been modified, this function needs to be called in
1069     order to write the changes to disk and replicate them to the other
1070     nodes.
1071
1072     @type job: L{_QueuedJob}
1073     @param job: the changed job
1074
1075     """
1076     filename = self._GetJobPath(job.id)
1077     data = serializer.DumpJson(job.Serialize(), indent=False)
1078     logging.debug("Writing job %s to %s", job.id, filename)
1079     self._WriteAndReplicateFileUnlocked(filename, data)
1080
1081     # Notify waiters about potential changes
1082     job.change.notifyAll()
1083
1084   @utils.LockedMethod
1085   @_RequireOpenQueue
1086   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1087                         timeout):
1088     """Waits for changes in a job.
1089
1090     @type job_id: string
1091     @param job_id: Job identifier
1092     @type fields: list of strings
1093     @param fields: Which fields to check for changes
1094     @type prev_job_info: list or None
1095     @param prev_job_info: Last job information returned
1096     @type prev_log_serial: int
1097     @param prev_log_serial: Last job message serial number
1098     @type timeout: float
1099     @param timeout: maximum time to wait
1100     @rtype: tuple (job info, log entries)
1101     @return: a tuple of the job information as required via
1102         the fields parameter, and the log entries as a list
1103
1104         if the job has not changed and the timeout has expired,
1105         we instead return a special value,
1106         L{constants.JOB_NOTCHANGED}, which should be interpreted
1107         as such by the clients
1108
1109     """
1110     logging.debug("Waiting for changes in job %s", job_id)
1111
1112     job_info = None
1113     log_entries = None
1114
1115     end_time = time.time() + timeout
1116     while True:
1117       delta_time = end_time - time.time()
1118       if delta_time < 0:
1119         return constants.JOB_NOTCHANGED
1120
1121       job = self._LoadJobUnlocked(job_id)
1122       if not job:
1123         logging.debug("Job %s not found", job_id)
1124         break
1125
1126       status = job.CalcStatus()
1127       job_info = self._GetJobInfoUnlocked(job, fields)
1128       log_entries = job.GetLogEntries(prev_log_serial)
1129
1130       # Serializing and deserializing data can cause type changes (e.g. from
1131       # tuple to list) or precision loss. We're doing it here so that we get
1132       # the same modifications as the data received from the client. Without
1133       # this, the comparison afterwards might fail without the data being
1134       # significantly different.
1135       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1136       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1137
1138       if status not in (constants.JOB_STATUS_QUEUED,
1139                         constants.JOB_STATUS_RUNNING,
1140                         constants.JOB_STATUS_WAITLOCK):
1141         # Don't even try to wait if the job is no longer running, there will be
1142         # no changes.
1143         break
1144
1145       if (prev_job_info != job_info or
1146           (log_entries and prev_log_serial != log_entries[0][0])):
1147         break
1148
1149       logging.debug("Waiting again")
1150
1151       # Release the queue lock while waiting
1152       job.change.wait(delta_time)
1153
1154     logging.debug("Job %s changed", job_id)
1155
1156     if job_info is None and log_entries is None:
1157       return None
1158     else:
1159       return (job_info, log_entries)
1160
1161   @utils.LockedMethod
1162   @_RequireOpenQueue
1163   def CancelJob(self, job_id):
1164     """Cancels a job.
1165
1166     This will only succeed if the job has not started yet.
1167
1168     @type job_id: string
1169     @param job_id: job ID of job to be cancelled.
1170
1171     """
1172     logging.info("Cancelling job %s", job_id)
1173
1174     job = self._LoadJobUnlocked(job_id)
1175     if not job:
1176       logging.debug("Job %s not found", job_id)
1177       return (False, "Job %s not found" % job_id)
1178
1179     job_status = job.CalcStatus()
1180
1181     if job_status not in (constants.JOB_STATUS_QUEUED,
1182                           constants.JOB_STATUS_WAITLOCK):
1183       logging.debug("Job %s is no longer waiting in the queue", job.id)
1184       return (False, "Job %s is no longer waiting in the queue" % job.id)
1185
1186     if job_status == constants.JOB_STATUS_QUEUED:
1187       self.CancelJobUnlocked(job)
1188       return (True, "Job %s canceled" % job.id)
1189
1190     elif job_status == constants.JOB_STATUS_WAITLOCK:
1191       # The worker will notice the new status and cancel the job
1192       try:
1193         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1194       finally:
1195         self.UpdateJobUnlocked(job)
1196       return (True, "Job %s will be canceled" % job.id)
1197
1198   @_RequireOpenQueue
1199   def CancelJobUnlocked(self, job):
1200     """Marks a job as canceled.
1201
1202     """
1203     try:
1204       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1205                             "Job canceled by request")
1206     finally:
1207       self.UpdateJobUnlocked(job)
1208
1209   @_RequireOpenQueue
1210   def _ArchiveJobsUnlocked(self, jobs):
1211     """Archives jobs.
1212
1213     @type jobs: list of L{_QueuedJob}
1214     @param jobs: Job objects
1215     @rtype: int
1216     @return: Number of archived jobs
1217
1218     """
1219     archive_jobs = []
1220     rename_files = []
1221     for job in jobs:
1222       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1223                                   constants.JOB_STATUS_SUCCESS,
1224                                   constants.JOB_STATUS_ERROR):
1225         logging.debug("Job %s is not yet done", job.id)
1226         continue
1227
1228       archive_jobs.append(job)
1229
1230       old = self._GetJobPath(job.id)
1231       new = self._GetArchivedJobPath(job.id)
1232       rename_files.append((old, new))
1233
1234     # TODO: What if 1..n files fail to rename?
1235     self._RenameFilesUnlocked(rename_files)
1236
1237     logging.debug("Successfully archived job(s) %s",
1238                   ", ".join(job.id for job in archive_jobs))
1239
1240     return len(archive_jobs)
1241
1242   @utils.LockedMethod
1243   @_RequireOpenQueue
1244   def ArchiveJob(self, job_id):
1245     """Archives a job.
1246
1247     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1248
1249     @type job_id: string
1250     @param job_id: Job ID of job to be archived.
1251     @rtype: bool
1252     @return: Whether job was archived
1253
1254     """
1255     logging.info("Archiving job %s", job_id)
1256
1257     job = self._LoadJobUnlocked(job_id)
1258     if not job:
1259       logging.debug("Job %s not found", job_id)
1260       return False
1261
1262     return self._ArchiveJobsUnlocked([job]) == 1
1263
1264   @utils.LockedMethod
1265   @_RequireOpenQueue
1266   def AutoArchiveJobs(self, age, timeout):
1267     """Archives all jobs based on age.
1268
1269     The method will archive all jobs which are older than the age
1270     parameter. For jobs that don't have an end timestamp, the start
1271     timestamp will be considered. The special '-1' age will cause
1272     archival of all jobs (that are not running or queued).
1273
1274     @type age: int
1275     @param age: the minimum age in seconds
1276
1277     """
1278     logging.info("Archiving jobs with age more than %s seconds", age)
1279
1280     now = time.time()
1281     end_time = now + timeout
1282     archived_count = 0
1283     last_touched = 0
1284
1285     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1286     pending = []
1287     for idx, job_id in enumerate(all_job_ids):
1288       last_touched = idx
1289
1290       # Not optimal because jobs could be pending
1291       # TODO: Measure average duration for job archival and take number of
1292       # pending jobs into account.
1293       if time.time() > end_time:
1294         break
1295
1296       # Returns None if the job failed to load
1297       job = self._LoadJobUnlocked(job_id)
1298       if job:
1299         if job.end_timestamp is None:
1300           if job.start_timestamp is None:
1301             job_age = job.received_timestamp
1302           else:
1303             job_age = job.start_timestamp
1304         else:
1305           job_age = job.end_timestamp
1306
1307         if age == -1 or now - job_age[0] > age:
1308           pending.append(job)
1309
1310           # Archive 10 jobs at a time
1311           if len(pending) >= 10:
1312             archived_count += self._ArchiveJobsUnlocked(pending)
1313             pending = []
1314
1315     if pending:
1316       archived_count += self._ArchiveJobsUnlocked(pending)
1317
1318     return (archived_count, len(all_job_ids) - last_touched - 1)
1319
1320   def _GetJobInfoUnlocked(self, job, fields):
1321     """Returns information about a job.
1322
1323     @type job: L{_QueuedJob}
1324     @param job: the job which we query
1325     @type fields: list
1326     @param fields: names of fields to return
1327     @rtype: list
1328     @return: list with one element for each field
1329     @raise errors.OpExecError: when an invalid field
1330         has been passed
1331
1332     """
1333     row = []
1334     for fname in fields:
1335       if fname == "id":
1336         row.append(job.id)
1337       elif fname == "status":
1338         row.append(job.CalcStatus())
1339       elif fname == "ops":
1340         row.append([op.input.__getstate__() for op in job.ops])
1341       elif fname == "opresult":
1342         row.append([op.result for op in job.ops])
1343       elif fname == "opstatus":
1344         row.append([op.status for op in job.ops])
1345       elif fname == "oplog":
1346         row.append([op.log for op in job.ops])
1347       elif fname == "opstart":
1348         row.append([op.start_timestamp for op in job.ops])
1349       elif fname == "opend":
1350         row.append([op.end_timestamp for op in job.ops])
1351       elif fname == "received_ts":
1352         row.append(job.received_timestamp)
1353       elif fname == "start_ts":
1354         row.append(job.start_timestamp)
1355       elif fname == "end_ts":
1356         row.append(job.end_timestamp)
1357       elif fname == "summary":
1358         row.append([op.input.Summary() for op in job.ops])
1359       else:
1360         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1361     return row
1362
1363   @utils.LockedMethod
1364   @_RequireOpenQueue
1365   def QueryJobs(self, job_ids, fields):
1366     """Returns a list of jobs in queue.
1367
1368     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1369     processing for each job.
1370
1371     @type job_ids: list
1372     @param job_ids: sequence of job identifiers or None for all
1373     @type fields: list
1374     @param fields: names of fields to return
1375     @rtype: list
1376     @return: list one element per job, each element being list with
1377         the requested fields
1378
1379     """
1380     jobs = []
1381
1382     for job in self._GetJobsUnlocked(job_ids):
1383       if job is None:
1384         jobs.append(None)
1385       else:
1386         jobs.append(self._GetJobInfoUnlocked(job, fields))
1387
1388     return jobs
1389
1390   @utils.LockedMethod
1391   @_RequireOpenQueue
1392   def Shutdown(self):
1393     """Stops the job queue.
1394
1395     This shutdowns all the worker threads an closes the queue.
1396
1397     """
1398     self._wpool.TerminateWorkers()
1399
1400     self._queue_lock.Close()
1401     self._queue_lock = None