job queue: fix interrupted job processing
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   __slots__ = ["input", "status", "result", "log",
84                "start_timestamp", "end_timestamp",
85                "__weakref__"]
86
87   def __init__(self, op):
88     """Constructor for the _QuededOpCode.
89
90     @type op: L{opcodes.OpCode}
91     @param op: the opcode we encapsulate
92
93     """
94     self.input = op
95     self.status = constants.OP_STATUS_QUEUED
96     self.result = None
97     self.log = []
98     self.start_timestamp = None
99     self.end_timestamp = None
100
101   @classmethod
102   def Restore(cls, state):
103     """Restore the _QueuedOpCode from the serialized form.
104
105     @type state: dict
106     @param state: the serialized state
107     @rtype: _QueuedOpCode
108     @return: a new _QueuedOpCode instance
109
110     """
111     obj = _QueuedOpCode.__new__(cls)
112     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113     obj.status = state["status"]
114     obj.result = state["result"]
115     obj.log = state["log"]
116     obj.start_timestamp = state.get("start_timestamp", None)
117     obj.end_timestamp = state.get("end_timestamp", None)
118     return obj
119
120   def Serialize(self):
121     """Serializes this _QueuedOpCode.
122
123     @rtype: dict
124     @return: the dictionary holding the serialized state
125
126     """
127     return {
128       "input": self.input.__getstate__(),
129       "status": self.status,
130       "result": self.result,
131       "log": self.log,
132       "start_timestamp": self.start_timestamp,
133       "end_timestamp": self.end_timestamp,
134       }
135
136
137 class _QueuedJob(object):
138   """In-memory job representation.
139
140   This is what we use to track the user-submitted jobs. Locking must
141   be taken care of by users of this class.
142
143   @type queue: L{JobQueue}
144   @ivar queue: the parent queue
145   @ivar id: the job ID
146   @type ops: list
147   @ivar ops: the list of _QueuedOpCode that constitute the job
148   @type run_op_index: int
149   @ivar run_op_index: the currently executing opcode, or -1 if
150       we didn't yet start executing
151   @type log_serial: int
152   @ivar log_serial: holds the index for the next log entry
153   @ivar received_timestamp: the timestamp for when the job was received
154   @ivar start_timestmap: the timestamp for start of execution
155   @ivar end_timestamp: the timestamp for end of execution
156   @ivar change: a Condition variable we use for waiting for job changes
157
158   """
159   __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160                "received_timestamp", "start_timestamp", "end_timestamp",
161                "change",
162                "__weakref__"]
163
164   def __init__(self, queue, job_id, ops):
165     """Constructor for the _QueuedJob.
166
167     @type queue: L{JobQueue}
168     @param queue: our parent queue
169     @type job_id: job_id
170     @param job_id: our job id
171     @type ops: list
172     @param ops: the list of opcodes we hold, which will be encapsulated
173         in _QueuedOpCodes
174
175     """
176     if not ops:
177       # TODO: use a better exception
178       raise Exception("No opcodes")
179
180     self.queue = queue
181     self.id = job_id
182     self.ops = [_QueuedOpCode(op) for op in ops]
183     self.run_op_index = -1
184     self.log_serial = 0
185     self.received_timestamp = TimeStampNow()
186     self.start_timestamp = None
187     self.end_timestamp = None
188
189     # Condition to wait for changes
190     self.change = threading.Condition(self.queue._lock)
191
192   @classmethod
193   def Restore(cls, queue, state):
194     """Restore a _QueuedJob from serialized state:
195
196     @type queue: L{JobQueue}
197     @param queue: to which queue the restored job belongs
198     @type state: dict
199     @param state: the serialized state
200     @rtype: _JobQueue
201     @return: the restored _JobQueue instance
202
203     """
204     obj = _QueuedJob.__new__(cls)
205     obj.queue = queue
206     obj.id = state["id"]
207     obj.run_op_index = state["run_op_index"]
208     obj.received_timestamp = state.get("received_timestamp", None)
209     obj.start_timestamp = state.get("start_timestamp", None)
210     obj.end_timestamp = state.get("end_timestamp", None)
211
212     obj.ops = []
213     obj.log_serial = 0
214     for op_state in state["ops"]:
215       op = _QueuedOpCode.Restore(op_state)
216       for log_entry in op.log:
217         obj.log_serial = max(obj.log_serial, log_entry[0])
218       obj.ops.append(op)
219
220     # Condition to wait for changes
221     obj.change = threading.Condition(obj.queue._lock)
222
223     return obj
224
225   def Serialize(self):
226     """Serialize the _JobQueue instance.
227
228     @rtype: dict
229     @return: the serialized state
230
231     """
232     return {
233       "id": self.id,
234       "ops": [op.Serialize() for op in self.ops],
235       "run_op_index": self.run_op_index,
236       "start_timestamp": self.start_timestamp,
237       "end_timestamp": self.end_timestamp,
238       "received_timestamp": self.received_timestamp,
239       }
240
241   def CalcStatus(self):
242     """Compute the status of this job.
243
244     This function iterates over all the _QueuedOpCodes in the job and
245     based on their status, computes the job status.
246
247     The algorithm is:
248       - if we find a cancelled, or finished with error, the job
249         status will be the same
250       - otherwise, the last opcode with the status one of:
251           - waitlock
252           - canceling
253           - running
254
255         will determine the job status
256
257       - otherwise, it means either all opcodes are queued, or success,
258         and the job status will be the same
259
260     @return: the job status
261
262     """
263     status = constants.JOB_STATUS_QUEUED
264
265     all_success = True
266     for op in self.ops:
267       if op.status == constants.OP_STATUS_SUCCESS:
268         continue
269
270       all_success = False
271
272       if op.status == constants.OP_STATUS_QUEUED:
273         pass
274       elif op.status == constants.OP_STATUS_WAITLOCK:
275         status = constants.JOB_STATUS_WAITLOCK
276       elif op.status == constants.OP_STATUS_RUNNING:
277         status = constants.JOB_STATUS_RUNNING
278       elif op.status == constants.OP_STATUS_CANCELING:
279         status = constants.JOB_STATUS_CANCELING
280         break
281       elif op.status == constants.OP_STATUS_ERROR:
282         status = constants.JOB_STATUS_ERROR
283         # The whole job fails if one opcode failed
284         break
285       elif op.status == constants.OP_STATUS_CANCELED:
286         status = constants.OP_STATUS_CANCELED
287         break
288
289     if all_success:
290       status = constants.JOB_STATUS_SUCCESS
291
292     return status
293
294   def GetLogEntries(self, newer_than):
295     """Selectively returns the log entries.
296
297     @type newer_than: None or int
298     @param newer_than: if this is None, return all log entries,
299         otherwise return only the log entries with serial higher
300         than this value
301     @rtype: list
302     @return: the list of the log entries selected
303
304     """
305     if newer_than is None:
306       serial = -1
307     else:
308       serial = newer_than
309
310     entries = []
311     for op in self.ops:
312       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313
314     return entries
315
316
317 class _JobQueueWorker(workerpool.BaseWorker):
318   """The actual job workers.
319
320   """
321   def _NotifyStart(self):
322     """Mark the opcode as running, not lock-waiting.
323
324     This is called from the mcpu code as a notifier function, when the
325     LU is finally about to start the Exec() method. Of course, to have
326     end-user visible results, the opcode must be initially (before
327     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
328
329     """
330     assert self.queue, "Queue attribute is missing"
331     assert self.opcode, "Opcode attribute is missing"
332
333     self.queue.acquire()
334     try:
335       assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
336                                     constants.OP_STATUS_CANCELING)
337
338       # Cancel here if we were asked to
339       if self.opcode.status == constants.OP_STATUS_CANCELING:
340         raise CancelJob()
341
342       self.opcode.status = constants.OP_STATUS_RUNNING
343     finally:
344       self.queue.release()
345
346   def RunTask(self, job):
347     """Job executor.
348
349     This functions processes a job. It is closely tied to the _QueuedJob and
350     _QueuedOpCode classes.
351
352     @type job: L{_QueuedJob}
353     @param job: the job to be processed
354
355     """
356     logging.info("Worker %s processing job %s",
357                   self.worker_id, job.id)
358     proc = mcpu.Processor(self.pool.queue.context)
359     self.queue = queue = job.queue
360     try:
361       try:
362         count = len(job.ops)
363         for idx, op in enumerate(job.ops):
364           op_summary = op.input.Summary()
365           if op.status == constants.OP_STATUS_SUCCESS:
366             # this is a job that was partially completed before master
367             # daemon shutdown, so it can be expected that some opcodes
368             # are already completed successfully (if any did error
369             # out, then the whole job should have been aborted and not
370             # resubmitted for processing)
371             logging.info("Op %s/%s: opcode %s already processed, skipping",
372                          idx + 1, count, op_summary)
373             continue
374           try:
375             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
376                          op_summary)
377
378             queue.acquire()
379             try:
380               if op.status == constants.OP_STATUS_CANCELED:
381                 raise CancelJob()
382               assert op.status == constants.OP_STATUS_QUEUED
383               job.run_op_index = idx
384               op.status = constants.OP_STATUS_WAITLOCK
385               op.result = None
386               op.start_timestamp = TimeStampNow()
387               if idx == 0: # first opcode
388                 job.start_timestamp = op.start_timestamp
389               queue.UpdateJobUnlocked(job)
390
391               input_opcode = op.input
392             finally:
393               queue.release()
394
395             def _Log(*args):
396               """Append a log entry.
397
398               """
399               assert len(args) < 3
400
401               if len(args) == 1:
402                 log_type = constants.ELOG_MESSAGE
403                 log_msg = args[0]
404               else:
405                 log_type, log_msg = args
406
407               # The time is split to make serialization easier and not lose
408               # precision.
409               timestamp = utils.SplitTime(time.time())
410
411               queue.acquire()
412               try:
413                 job.log_serial += 1
414                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
415
416                 job.change.notifyAll()
417               finally:
418                 queue.release()
419
420             # Make sure not to hold lock while _Log is called
421             self.opcode = op
422             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
423
424             queue.acquire()
425             try:
426               op.status = constants.OP_STATUS_SUCCESS
427               op.result = result
428               op.end_timestamp = TimeStampNow()
429               queue.UpdateJobUnlocked(job)
430             finally:
431               queue.release()
432
433             logging.info("Op %s/%s: Successfully finished opcode %s",
434                          idx + 1, count, op_summary)
435           except CancelJob:
436             # Will be handled further up
437             raise
438           except Exception, err:
439             queue.acquire()
440             try:
441               try:
442                 op.status = constants.OP_STATUS_ERROR
443                 op.result = str(err)
444                 op.end_timestamp = TimeStampNow()
445                 logging.info("Op %s/%s: Error in opcode %s: %s",
446                              idx + 1, count, op_summary, err)
447               finally:
448                 queue.UpdateJobUnlocked(job)
449             finally:
450               queue.release()
451             raise
452
453       except CancelJob:
454         queue.acquire()
455         try:
456           queue.CancelJobUnlocked(job)
457         finally:
458           queue.release()
459       except errors.GenericError, err:
460         logging.exception("Ganeti exception")
461       except:
462         logging.exception("Unhandled exception")
463     finally:
464       queue.acquire()
465       try:
466         try:
467           job.run_op_index = -1
468           job.end_timestamp = TimeStampNow()
469           queue.UpdateJobUnlocked(job)
470         finally:
471           job_id = job.id
472           status = job.CalcStatus()
473       finally:
474         queue.release()
475       logging.info("Worker %s finished job %s, status = %s",
476                    self.worker_id, job_id, status)
477
478
479 class _JobQueueWorkerPool(workerpool.WorkerPool):
480   """Simple class implementing a job-processing workerpool.
481
482   """
483   def __init__(self, queue):
484     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
485                                               _JobQueueWorker)
486     self.queue = queue
487
488
489 class JobQueue(object):
490   """Queue used to manage the jobs.
491
492   @cvar _RE_JOB_FILE: regex matching the valid job file names
493
494   """
495   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
496
497   def _RequireOpenQueue(fn):
498     """Decorator for "public" functions.
499
500     This function should be used for all 'public' functions. That is,
501     functions usually called from other classes.
502
503     @warning: Use this decorator only after utils.LockedMethod!
504
505     Example::
506       @utils.LockedMethod
507       @_RequireOpenQueue
508       def Example(self):
509         pass
510
511     """
512     def wrapper(self, *args, **kwargs):
513       assert self._queue_lock is not None, "Queue should be open"
514       return fn(self, *args, **kwargs)
515     return wrapper
516
517   def __init__(self, context):
518     """Constructor for JobQueue.
519
520     The constructor will initialize the job queue object and then
521     start loading the current jobs from disk, either for starting them
522     (if they were queue) or for aborting them (if they were already
523     running).
524
525     @type context: GanetiContext
526     @param context: the context object for access to the configuration
527         data and other ganeti objects
528
529     """
530     self.context = context
531     self._memcache = weakref.WeakValueDictionary()
532     self._my_hostname = utils.HostInfo().name
533
534     # Locking
535     self._lock = threading.Lock()
536     self.acquire = self._lock.acquire
537     self.release = self._lock.release
538
539     # Initialize
540     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
541
542     # Read serial file
543     self._last_serial = jstore.ReadSerial()
544     assert self._last_serial is not None, ("Serial file was modified between"
545                                            " check in jstore and here")
546
547     # Get initial list of nodes
548     self._nodes = dict((n.name, n.primary_ip)
549                        for n in self.context.cfg.GetAllNodesInfo().values()
550                        if n.master_candidate)
551
552     # Remove master node
553     try:
554       del self._nodes[self._my_hostname]
555     except KeyError:
556       pass
557
558     # TODO: Check consistency across nodes
559
560     # Setup worker pool
561     self._wpool = _JobQueueWorkerPool(self)
562     try:
563       # We need to lock here because WorkerPool.AddTask() may start a job while
564       # we're still doing our work.
565       self.acquire()
566       try:
567         logging.info("Inspecting job queue")
568
569         all_job_ids = self._GetJobIDsUnlocked()
570         jobs_count = len(all_job_ids)
571         lastinfo = time.time()
572         for idx, job_id in enumerate(all_job_ids):
573           # Give an update every 1000 jobs or 10 seconds
574           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
575               idx == (jobs_count - 1)):
576             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
577                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
578             lastinfo = time.time()
579
580           job = self._LoadJobUnlocked(job_id)
581
582           # a failure in loading the job can cause 'None' to be returned
583           if job is None:
584             continue
585
586           status = job.CalcStatus()
587
588           if status in (constants.JOB_STATUS_QUEUED, ):
589             self._wpool.AddTask(job)
590
591           elif status in (constants.JOB_STATUS_RUNNING,
592                           constants.JOB_STATUS_WAITLOCK,
593                           constants.JOB_STATUS_CANCELING):
594             logging.warning("Unfinished job %s found: %s", job.id, job)
595             try:
596               for op in job.ops:
597                 op.status = constants.OP_STATUS_ERROR
598                 op.result = "Unclean master daemon shutdown"
599             finally:
600               self.UpdateJobUnlocked(job)
601
602         logging.info("Job queue inspection finished")
603       finally:
604         self.release()
605     except:
606       self._wpool.TerminateWorkers()
607       raise
608
609   @utils.LockedMethod
610   @_RequireOpenQueue
611   def AddNode(self, node):
612     """Register a new node with the queue.
613
614     @type node: L{objects.Node}
615     @param node: the node object to be added
616
617     """
618     node_name = node.name
619     assert node_name != self._my_hostname
620
621     # Clean queue directory on added node
622     rpc.RpcRunner.call_jobqueue_purge(node_name)
623
624     if not node.master_candidate:
625       # remove if existing, ignoring errors
626       self._nodes.pop(node_name, None)
627       # and skip the replication of the job ids
628       return
629
630     # Upload the whole queue excluding archived jobs
631     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
632
633     # Upload current serial file
634     files.append(constants.JOB_QUEUE_SERIAL_FILE)
635
636     for file_name in files:
637       # Read file content
638       fd = open(file_name, "r")
639       try:
640         content = fd.read()
641       finally:
642         fd.close()
643
644       result = rpc.RpcRunner.call_jobqueue_update([node_name],
645                                                   [node.primary_ip],
646                                                   file_name, content)
647       if not result[node_name]:
648         logging.error("Failed to upload %s to %s", file_name, node_name)
649
650     self._nodes[node_name] = node.primary_ip
651
652   @utils.LockedMethod
653   @_RequireOpenQueue
654   def RemoveNode(self, node_name):
655     """Callback called when removing nodes from the cluster.
656
657     @type node_name: str
658     @param node_name: the name of the node to remove
659
660     """
661     try:
662       # The queue is removed by the "leave node" RPC call.
663       del self._nodes[node_name]
664     except KeyError:
665       pass
666
667   def _CheckRpcResult(self, result, nodes, failmsg):
668     """Verifies the status of an RPC call.
669
670     Since we aim to keep consistency should this node (the current
671     master) fail, we will log errors if our rpc fail, and especially
672     log the case when more than half of the nodes fails.
673
674     @param result: the data as returned from the rpc call
675     @type nodes: list
676     @param nodes: the list of nodes we made the call to
677     @type failmsg: str
678     @param failmsg: the identifier to be used for logging
679
680     """
681     failed = []
682     success = []
683
684     for node in nodes:
685       if result[node]:
686         success.append(node)
687       else:
688         failed.append(node)
689
690     if failed:
691       logging.error("%s failed on %s", failmsg, ", ".join(failed))
692
693     # +1 for the master node
694     if (len(success) + 1) < len(failed):
695       # TODO: Handle failing nodes
696       logging.error("More than half of the nodes failed")
697
698   def _GetNodeIp(self):
699     """Helper for returning the node name/ip list.
700
701     @rtype: (list, list)
702     @return: a tuple of two lists, the first one with the node
703         names and the second one with the node addresses
704
705     """
706     name_list = self._nodes.keys()
707     addr_list = [self._nodes[name] for name in name_list]
708     return name_list, addr_list
709
710   def _WriteAndReplicateFileUnlocked(self, file_name, data):
711     """Writes a file locally and then replicates it to all nodes.
712
713     This function will replace the contents of a file on the local
714     node and then replicate it to all the other nodes we have.
715
716     @type file_name: str
717     @param file_name: the path of the file to be replicated
718     @type data: str
719     @param data: the new contents of the file
720
721     """
722     utils.WriteFile(file_name, data=data)
723
724     names, addrs = self._GetNodeIp()
725     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
726     self._CheckRpcResult(result, self._nodes,
727                          "Updating %s" % file_name)
728
729   def _RenameFilesUnlocked(self, rename):
730     """Renames a file locally and then replicate the change.
731
732     This function will rename a file in the local queue directory
733     and then replicate this rename to all the other nodes we have.
734
735     @type rename: list of (old, new)
736     @param rename: List containing tuples mapping old to new names
737
738     """
739     # Rename them locally
740     for old, new in rename:
741       utils.RenameFile(old, new, mkdir=True)
742
743     # ... and on all nodes
744     names, addrs = self._GetNodeIp()
745     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
746     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
747
748   def _FormatJobID(self, job_id):
749     """Convert a job ID to string format.
750
751     Currently this just does C{str(job_id)} after performing some
752     checks, but if we want to change the job id format this will
753     abstract this change.
754
755     @type job_id: int or long
756     @param job_id: the numeric job id
757     @rtype: str
758     @return: the formatted job id
759
760     """
761     if not isinstance(job_id, (int, long)):
762       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
763     if job_id < 0:
764       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
765
766     return str(job_id)
767
768   @classmethod
769   def _GetArchiveDirectory(cls, job_id):
770     """Returns the archive directory for a job.
771
772     @type job_id: str
773     @param job_id: Job identifier
774     @rtype: str
775     @return: Directory name
776
777     """
778     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
779
780   def _NewSerialUnlocked(self):
781     """Generates a new job identifier.
782
783     Job identifiers are unique during the lifetime of a cluster.
784
785     @rtype: str
786     @return: a string representing the job identifier.
787
788     """
789     # New number
790     serial = self._last_serial + 1
791
792     # Write to file
793     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
794                                         "%s\n" % serial)
795
796     # Keep it only if we were able to write the file
797     self._last_serial = serial
798
799     return self._FormatJobID(serial)
800
801   @staticmethod
802   def _GetJobPath(job_id):
803     """Returns the job file for a given job id.
804
805     @type job_id: str
806     @param job_id: the job identifier
807     @rtype: str
808     @return: the path to the job file
809
810     """
811     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
812
813   @classmethod
814   def _GetArchivedJobPath(cls, job_id):
815     """Returns the archived job file for a give job id.
816
817     @type job_id: str
818     @param job_id: the job identifier
819     @rtype: str
820     @return: the path to the archived job file
821
822     """
823     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
824     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
825
826   @classmethod
827   def _ExtractJobID(cls, name):
828     """Extract the job id from a filename.
829
830     @type name: str
831     @param name: the job filename
832     @rtype: job id or None
833     @return: the job id corresponding to the given filename,
834         or None if the filename does not represent a valid
835         job file
836
837     """
838     m = cls._RE_JOB_FILE.match(name)
839     if m:
840       return m.group(1)
841     else:
842       return None
843
844   def _GetJobIDsUnlocked(self, archived=False):
845     """Return all known job IDs.
846
847     If the parameter archived is True, archived jobs IDs will be
848     included. Currently this argument is unused.
849
850     The method only looks at disk because it's a requirement that all
851     jobs are present on disk (so in the _memcache we don't have any
852     extra IDs).
853
854     @rtype: list
855     @return: the list of job IDs
856
857     """
858     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
859     jlist = utils.NiceSort(jlist)
860     return jlist
861
862   def _ListJobFiles(self):
863     """Returns the list of current job files.
864
865     @rtype: list
866     @return: the list of job file names
867
868     """
869     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
870             if self._RE_JOB_FILE.match(name)]
871
872   def _LoadJobUnlocked(self, job_id):
873     """Loads a job from the disk or memory.
874
875     Given a job id, this will return the cached job object if
876     existing, or try to load the job from the disk. If loading from
877     disk, it will also add the job to the cache.
878
879     @param job_id: the job id
880     @rtype: L{_QueuedJob} or None
881     @return: either None or the job object
882
883     """
884     job = self._memcache.get(job_id, None)
885     if job:
886       logging.debug("Found job %s in memcache", job_id)
887       return job
888
889     filepath = self._GetJobPath(job_id)
890     logging.debug("Loading job from %s", filepath)
891     try:
892       fd = open(filepath, "r")
893     except IOError, err:
894       if err.errno in (errno.ENOENT, ):
895         return None
896       raise
897     try:
898       data = serializer.LoadJson(fd.read())
899     finally:
900       fd.close()
901
902     try:
903       job = _QueuedJob.Restore(self, data)
904     except Exception, err:
905       new_path = self._GetArchivedJobPath(job_id)
906       if filepath == new_path:
907         # job already archived (future case)
908         logging.exception("Can't parse job %s", job_id)
909       else:
910         # non-archived case
911         logging.exception("Can't parse job %s, will archive.", job_id)
912         self._RenameFilesUnlocked([(filepath, new_path)])
913       return None
914
915     self._memcache[job_id] = job
916     logging.debug("Added job %s to the cache", job_id)
917     return job
918
919   def _GetJobsUnlocked(self, job_ids):
920     """Return a list of jobs based on their IDs.
921
922     @type job_ids: list
923     @param job_ids: either an empty list (meaning all jobs),
924         or a list of job IDs
925     @rtype: list
926     @return: the list of job objects
927
928     """
929     if not job_ids:
930       job_ids = self._GetJobIDsUnlocked()
931
932     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
933
934   @staticmethod
935   def _IsQueueMarkedDrain():
936     """Check if the queue is marked from drain.
937
938     This currently uses the queue drain file, which makes it a
939     per-node flag. In the future this can be moved to the config file.
940
941     @rtype: boolean
942     @return: True of the job queue is marked for draining
943
944     """
945     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
946
947   @staticmethod
948   def SetDrainFlag(drain_flag):
949     """Sets the drain flag for the queue.
950
951     This is similar to the function L{backend.JobQueueSetDrainFlag},
952     and in the future we might merge them.
953
954     @type drain_flag: boolean
955     @param drain_flag: Whether to set or unset the drain flag
956
957     """
958     if drain_flag:
959       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
960     else:
961       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
962     return True
963
964   @utils.LockedMethod
965   @_RequireOpenQueue
966   def SubmitJob(self, ops):
967     """Create and store a new job.
968
969     This enters the job into our job queue and also puts it on the new
970     queue, in order for it to be picked up by the queue processors.
971
972     @type ops: list
973     @param ops: The list of OpCodes that will become the new job.
974     @rtype: job ID
975     @return: the job ID of the newly created job
976     @raise errors.JobQueueDrainError: if the job is marked for draining
977
978     """
979     if self._IsQueueMarkedDrain():
980       raise errors.JobQueueDrainError()
981
982     # Check job queue size
983     size = len(self._ListJobFiles())
984     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
985       # TODO: Autoarchive jobs. Make sure it's not done on every job
986       # submission, though.
987       #size = ...
988       pass
989
990     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
991       raise errors.JobQueueFull()
992
993     # Get job identifier
994     job_id = self._NewSerialUnlocked()
995     job = _QueuedJob(self, job_id, ops)
996
997     # Write to disk
998     self.UpdateJobUnlocked(job)
999
1000     logging.debug("Adding new job %s to the cache", job_id)
1001     self._memcache[job_id] = job
1002
1003     # Add to worker pool
1004     self._wpool.AddTask(job)
1005
1006     return job.id
1007
1008   @_RequireOpenQueue
1009   def UpdateJobUnlocked(self, job):
1010     """Update a job's on disk storage.
1011
1012     After a job has been modified, this function needs to be called in
1013     order to write the changes to disk and replicate them to the other
1014     nodes.
1015
1016     @type job: L{_QueuedJob}
1017     @param job: the changed job
1018
1019     """
1020     filename = self._GetJobPath(job.id)
1021     data = serializer.DumpJson(job.Serialize(), indent=False)
1022     logging.debug("Writing job %s to %s", job.id, filename)
1023     self._WriteAndReplicateFileUnlocked(filename, data)
1024
1025     # Notify waiters about potential changes
1026     job.change.notifyAll()
1027
1028   @utils.LockedMethod
1029   @_RequireOpenQueue
1030   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1031                         timeout):
1032     """Waits for changes in a job.
1033
1034     @type job_id: string
1035     @param job_id: Job identifier
1036     @type fields: list of strings
1037     @param fields: Which fields to check for changes
1038     @type prev_job_info: list or None
1039     @param prev_job_info: Last job information returned
1040     @type prev_log_serial: int
1041     @param prev_log_serial: Last job message serial number
1042     @type timeout: float
1043     @param timeout: maximum time to wait
1044     @rtype: tuple (job info, log entries)
1045     @return: a tuple of the job information as required via
1046         the fields parameter, and the log entries as a list
1047
1048         if the job has not changed and the timeout has expired,
1049         we instead return a special value,
1050         L{constants.JOB_NOTCHANGED}, which should be interpreted
1051         as such by the clients
1052
1053     """
1054     logging.debug("Waiting for changes in job %s", job_id)
1055     end_time = time.time() + timeout
1056     while True:
1057       delta_time = end_time - time.time()
1058       if delta_time < 0:
1059         return constants.JOB_NOTCHANGED
1060
1061       job = self._LoadJobUnlocked(job_id)
1062       if not job:
1063         logging.debug("Job %s not found", job_id)
1064         break
1065
1066       status = job.CalcStatus()
1067       job_info = self._GetJobInfoUnlocked(job, fields)
1068       log_entries = job.GetLogEntries(prev_log_serial)
1069
1070       # Serializing and deserializing data can cause type changes (e.g. from
1071       # tuple to list) or precision loss. We're doing it here so that we get
1072       # the same modifications as the data received from the client. Without
1073       # this, the comparison afterwards might fail without the data being
1074       # significantly different.
1075       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1076       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1077
1078       if status not in (constants.JOB_STATUS_QUEUED,
1079                         constants.JOB_STATUS_RUNNING,
1080                         constants.JOB_STATUS_WAITLOCK):
1081         # Don't even try to wait if the job is no longer running, there will be
1082         # no changes.
1083         break
1084
1085       if (prev_job_info != job_info or
1086           (log_entries and prev_log_serial != log_entries[0][0])):
1087         break
1088
1089       logging.debug("Waiting again")
1090
1091       # Release the queue lock while waiting
1092       job.change.wait(delta_time)
1093
1094     logging.debug("Job %s changed", job_id)
1095
1096     return (job_info, log_entries)
1097
1098   @utils.LockedMethod
1099   @_RequireOpenQueue
1100   def CancelJob(self, job_id):
1101     """Cancels a job.
1102
1103     This will only succeed if the job has not started yet.
1104
1105     @type job_id: string
1106     @param job_id: job ID of job to be cancelled.
1107
1108     """
1109     logging.info("Cancelling job %s", job_id)
1110
1111     job = self._LoadJobUnlocked(job_id)
1112     if not job:
1113       logging.debug("Job %s not found", job_id)
1114       return (False, "Job %s not found" % job_id)
1115
1116     job_status = job.CalcStatus()
1117
1118     if job_status not in (constants.JOB_STATUS_QUEUED,
1119                           constants.JOB_STATUS_WAITLOCK):
1120       logging.debug("Job %s is no longer in the queue", job.id)
1121       return (False, "Job %s is no longer in the queue" % job.id)
1122
1123     if job_status == constants.JOB_STATUS_QUEUED:
1124       self.CancelJobUnlocked(job)
1125       return (True, "Job %s canceled" % job.id)
1126
1127     elif job_status == constants.JOB_STATUS_WAITLOCK:
1128       # The worker will notice the new status and cancel the job
1129       try:
1130         for op in job.ops:
1131           op.status = constants.OP_STATUS_CANCELING
1132       finally:
1133         self.UpdateJobUnlocked(job)
1134       return (True, "Job %s will be canceled" % job.id)
1135
1136   @_RequireOpenQueue
1137   def CancelJobUnlocked(self, job):
1138     """Marks a job as canceled.
1139
1140     """
1141     try:
1142       for op in job.ops:
1143         op.status = constants.OP_STATUS_CANCELED
1144         op.result = "Job canceled by request"
1145     finally:
1146       self.UpdateJobUnlocked(job)
1147
1148   @_RequireOpenQueue
1149   def _ArchiveJobsUnlocked(self, jobs):
1150     """Archives jobs.
1151
1152     @type jobs: list of L{_QueuedJob}
1153     @param jobs: Job objects
1154     @rtype: int
1155     @return: Number of archived jobs
1156
1157     """
1158     archive_jobs = []
1159     rename_files = []
1160     for job in jobs:
1161       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1162                                   constants.JOB_STATUS_SUCCESS,
1163                                   constants.JOB_STATUS_ERROR):
1164         logging.debug("Job %s is not yet done", job.id)
1165         continue
1166
1167       archive_jobs.append(job)
1168
1169       old = self._GetJobPath(job.id)
1170       new = self._GetArchivedJobPath(job.id)
1171       rename_files.append((old, new))
1172
1173     # TODO: What if 1..n files fail to rename?
1174     self._RenameFilesUnlocked(rename_files)
1175
1176     logging.debug("Successfully archived job(s) %s",
1177                   ", ".join(job.id for job in archive_jobs))
1178
1179     return len(archive_jobs)
1180
1181   @utils.LockedMethod
1182   @_RequireOpenQueue
1183   def ArchiveJob(self, job_id):
1184     """Archives a job.
1185
1186     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1187
1188     @type job_id: string
1189     @param job_id: Job ID of job to be archived.
1190     @rtype: bool
1191     @return: Whether job was archived
1192
1193     """
1194     logging.info("Archiving job %s", job_id)
1195
1196     job = self._LoadJobUnlocked(job_id)
1197     if not job:
1198       logging.debug("Job %s not found", job_id)
1199       return False
1200
1201     return self._ArchiveJobsUnlocked([job]) == 1
1202
1203   @utils.LockedMethod
1204   @_RequireOpenQueue
1205   def AutoArchiveJobs(self, age, timeout):
1206     """Archives all jobs based on age.
1207
1208     The method will archive all jobs which are older than the age
1209     parameter. For jobs that don't have an end timestamp, the start
1210     timestamp will be considered. The special '-1' age will cause
1211     archival of all jobs (that are not running or queued).
1212
1213     @type age: int
1214     @param age: the minimum age in seconds
1215
1216     """
1217     logging.info("Archiving jobs with age more than %s seconds", age)
1218
1219     now = time.time()
1220     end_time = now + timeout
1221     archived_count = 0
1222     last_touched = 0
1223
1224     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1225     pending = []
1226     for idx, job_id in enumerate(all_job_ids):
1227       last_touched = idx
1228
1229       # Not optimal because jobs could be pending
1230       # TODO: Measure average duration for job archival and take number of
1231       # pending jobs into account.
1232       if time.time() > end_time:
1233         break
1234
1235       # Returns None if the job failed to load
1236       job = self._LoadJobUnlocked(job_id)
1237       if job:
1238         if job.end_timestamp is None:
1239           if job.start_timestamp is None:
1240             job_age = job.received_timestamp
1241           else:
1242             job_age = job.start_timestamp
1243         else:
1244           job_age = job.end_timestamp
1245
1246         if age == -1 or now - job_age[0] > age:
1247           pending.append(job)
1248
1249           # Archive 10 jobs at a time
1250           if len(pending) >= 10:
1251             archived_count += self._ArchiveJobsUnlocked(pending)
1252             pending = []
1253
1254     if pending:
1255       archived_count += self._ArchiveJobsUnlocked(pending)
1256
1257     return (archived_count, len(all_job_ids) - last_touched - 1)
1258
1259   def _GetJobInfoUnlocked(self, job, fields):
1260     """Returns information about a job.
1261
1262     @type job: L{_QueuedJob}
1263     @param job: the job which we query
1264     @type fields: list
1265     @param fields: names of fields to return
1266     @rtype: list
1267     @return: list with one element for each field
1268     @raise errors.OpExecError: when an invalid field
1269         has been passed
1270
1271     """
1272     row = []
1273     for fname in fields:
1274       if fname == "id":
1275         row.append(job.id)
1276       elif fname == "status":
1277         row.append(job.CalcStatus())
1278       elif fname == "ops":
1279         row.append([op.input.__getstate__() for op in job.ops])
1280       elif fname == "opresult":
1281         row.append([op.result for op in job.ops])
1282       elif fname == "opstatus":
1283         row.append([op.status for op in job.ops])
1284       elif fname == "oplog":
1285         row.append([op.log for op in job.ops])
1286       elif fname == "opstart":
1287         row.append([op.start_timestamp for op in job.ops])
1288       elif fname == "opend":
1289         row.append([op.end_timestamp for op in job.ops])
1290       elif fname == "received_ts":
1291         row.append(job.received_timestamp)
1292       elif fname == "start_ts":
1293         row.append(job.start_timestamp)
1294       elif fname == "end_ts":
1295         row.append(job.end_timestamp)
1296       elif fname == "summary":
1297         row.append([op.input.Summary() for op in job.ops])
1298       else:
1299         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1300     return row
1301
1302   @utils.LockedMethod
1303   @_RequireOpenQueue
1304   def QueryJobs(self, job_ids, fields):
1305     """Returns a list of jobs in queue.
1306
1307     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1308     processing for each job.
1309
1310     @type job_ids: list
1311     @param job_ids: sequence of job identifiers or None for all
1312     @type fields: list
1313     @param fields: names of fields to return
1314     @rtype: list
1315     @return: list one element per job, each element being list with
1316         the requested fields
1317
1318     """
1319     jobs = []
1320
1321     for job in self._GetJobsUnlocked(job_ids):
1322       if job is None:
1323         jobs.append(None)
1324       else:
1325         jobs.append(self._GetJobInfoUnlocked(job, fields))
1326
1327     return jobs
1328
1329   @utils.LockedMethod
1330   @_RequireOpenQueue
1331   def Shutdown(self):
1332     """Stops the job queue.
1333
1334     This shutdowns all the worker threads an closes the queue.
1335
1336     """
1337     self._wpool.TerminateWorkers()
1338
1339     self._queue_lock.Close()
1340     self._queue_lock = None