Merge commit 'origin/next' into branch-2.1
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   __slots__ = ["input", "status", "result", "log",
84                "start_timestamp", "end_timestamp",
85                "__weakref__"]
86
87   def __init__(self, op):
88     """Constructor for the _QuededOpCode.
89
90     @type op: L{opcodes.OpCode}
91     @param op: the opcode we encapsulate
92
93     """
94     self.input = op
95     self.status = constants.OP_STATUS_QUEUED
96     self.result = None
97     self.log = []
98     self.start_timestamp = None
99     self.end_timestamp = None
100
101   @classmethod
102   def Restore(cls, state):
103     """Restore the _QueuedOpCode from the serialized form.
104
105     @type state: dict
106     @param state: the serialized state
107     @rtype: _QueuedOpCode
108     @return: a new _QueuedOpCode instance
109
110     """
111     obj = _QueuedOpCode.__new__(cls)
112     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113     obj.status = state["status"]
114     obj.result = state["result"]
115     obj.log = state["log"]
116     obj.start_timestamp = state.get("start_timestamp", None)
117     obj.end_timestamp = state.get("end_timestamp", None)
118     return obj
119
120   def Serialize(self):
121     """Serializes this _QueuedOpCode.
122
123     @rtype: dict
124     @return: the dictionary holding the serialized state
125
126     """
127     return {
128       "input": self.input.__getstate__(),
129       "status": self.status,
130       "result": self.result,
131       "log": self.log,
132       "start_timestamp": self.start_timestamp,
133       "end_timestamp": self.end_timestamp,
134       }
135
136
137 class _QueuedJob(object):
138   """In-memory job representation.
139
140   This is what we use to track the user-submitted jobs. Locking must
141   be taken care of by users of this class.
142
143   @type queue: L{JobQueue}
144   @ivar queue: the parent queue
145   @ivar id: the job ID
146   @type ops: list
147   @ivar ops: the list of _QueuedOpCode that constitute the job
148   @type run_op_index: int
149   @ivar run_op_index: the currently executing opcode, or -1 if
150       we didn't yet start executing
151   @type log_serial: int
152   @ivar log_serial: holds the index for the next log entry
153   @ivar received_timestamp: the timestamp for when the job was received
154   @ivar start_timestmap: the timestamp for start of execution
155   @ivar end_timestamp: the timestamp for end of execution
156   @ivar change: a Condition variable we use for waiting for job changes
157
158   """
159   __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160                "received_timestamp", "start_timestamp", "end_timestamp",
161                "change",
162                "__weakref__"]
163
164   def __init__(self, queue, job_id, ops):
165     """Constructor for the _QueuedJob.
166
167     @type queue: L{JobQueue}
168     @param queue: our parent queue
169     @type job_id: job_id
170     @param job_id: our job id
171     @type ops: list
172     @param ops: the list of opcodes we hold, which will be encapsulated
173         in _QueuedOpCodes
174
175     """
176     if not ops:
177       # TODO: use a better exception
178       raise Exception("No opcodes")
179
180     self.queue = queue
181     self.id = job_id
182     self.ops = [_QueuedOpCode(op) for op in ops]
183     self.run_op_index = -1
184     self.log_serial = 0
185     self.received_timestamp = TimeStampNow()
186     self.start_timestamp = None
187     self.end_timestamp = None
188
189     # Condition to wait for changes
190     self.change = threading.Condition(self.queue._lock)
191
192   @classmethod
193   def Restore(cls, queue, state):
194     """Restore a _QueuedJob from serialized state:
195
196     @type queue: L{JobQueue}
197     @param queue: to which queue the restored job belongs
198     @type state: dict
199     @param state: the serialized state
200     @rtype: _JobQueue
201     @return: the restored _JobQueue instance
202
203     """
204     obj = _QueuedJob.__new__(cls)
205     obj.queue = queue
206     obj.id = state["id"]
207     obj.run_op_index = state["run_op_index"]
208     obj.received_timestamp = state.get("received_timestamp", None)
209     obj.start_timestamp = state.get("start_timestamp", None)
210     obj.end_timestamp = state.get("end_timestamp", None)
211
212     obj.ops = []
213     obj.log_serial = 0
214     for op_state in state["ops"]:
215       op = _QueuedOpCode.Restore(op_state)
216       for log_entry in op.log:
217         obj.log_serial = max(obj.log_serial, log_entry[0])
218       obj.ops.append(op)
219
220     # Condition to wait for changes
221     obj.change = threading.Condition(obj.queue._lock)
222
223     return obj
224
225   def Serialize(self):
226     """Serialize the _JobQueue instance.
227
228     @rtype: dict
229     @return: the serialized state
230
231     """
232     return {
233       "id": self.id,
234       "ops": [op.Serialize() for op in self.ops],
235       "run_op_index": self.run_op_index,
236       "start_timestamp": self.start_timestamp,
237       "end_timestamp": self.end_timestamp,
238       "received_timestamp": self.received_timestamp,
239       }
240
241   def CalcStatus(self):
242     """Compute the status of this job.
243
244     This function iterates over all the _QueuedOpCodes in the job and
245     based on their status, computes the job status.
246
247     The algorithm is:
248       - if we find a cancelled, or finished with error, the job
249         status will be the same
250       - otherwise, the last opcode with the status one of:
251           - waitlock
252           - canceling
253           - running
254
255         will determine the job status
256
257       - otherwise, it means either all opcodes are queued, or success,
258         and the job status will be the same
259
260     @return: the job status
261
262     """
263     status = constants.JOB_STATUS_QUEUED
264
265     all_success = True
266     for op in self.ops:
267       if op.status == constants.OP_STATUS_SUCCESS:
268         continue
269
270       all_success = False
271
272       if op.status == constants.OP_STATUS_QUEUED:
273         pass
274       elif op.status == constants.OP_STATUS_WAITLOCK:
275         status = constants.JOB_STATUS_WAITLOCK
276       elif op.status == constants.OP_STATUS_RUNNING:
277         status = constants.JOB_STATUS_RUNNING
278       elif op.status == constants.OP_STATUS_CANCELING:
279         status = constants.JOB_STATUS_CANCELING
280         break
281       elif op.status == constants.OP_STATUS_ERROR:
282         status = constants.JOB_STATUS_ERROR
283         # The whole job fails if one opcode failed
284         break
285       elif op.status == constants.OP_STATUS_CANCELED:
286         status = constants.OP_STATUS_CANCELED
287         break
288
289     if all_success:
290       status = constants.JOB_STATUS_SUCCESS
291
292     return status
293
294   def GetLogEntries(self, newer_than):
295     """Selectively returns the log entries.
296
297     @type newer_than: None or int
298     @param newer_than: if this is None, return all log entries,
299         otherwise return only the log entries with serial higher
300         than this value
301     @rtype: list
302     @return: the list of the log entries selected
303
304     """
305     if newer_than is None:
306       serial = -1
307     else:
308       serial = newer_than
309
310     entries = []
311     for op in self.ops:
312       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313
314     return entries
315
316   def MarkUnfinishedOps(self, status, result):
317     """Mark unfinished opcodes with a given status and result.
318
319     This is an utility function for marking all running or waiting to
320     be run opcodes with a given status. Opcodes which are already
321     finalised are not changed.
322
323     @param status: a given opcode status
324     @param result: the opcode result
325
326     """
327     not_marked = True
328     for op in self.ops:
329       if op.status in constants.OPS_FINALIZED:
330         assert not_marked, "Finalized opcodes found after non-finalized ones"
331         continue
332       op.status = status
333       op.result = result
334       not_marked = False
335
336
337 class _JobQueueWorker(workerpool.BaseWorker):
338   """The actual job workers.
339
340   """
341   def _NotifyStart(self):
342     """Mark the opcode as running, not lock-waiting.
343
344     This is called from the mcpu code as a notifier function, when the
345     LU is finally about to start the Exec() method. Of course, to have
346     end-user visible results, the opcode must be initially (before
347     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
348
349     """
350     assert self.queue, "Queue attribute is missing"
351     assert self.opcode, "Opcode attribute is missing"
352
353     self.queue.acquire()
354     try:
355       assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
356                                     constants.OP_STATUS_CANCELING)
357
358       # Cancel here if we were asked to
359       if self.opcode.status == constants.OP_STATUS_CANCELING:
360         raise CancelJob()
361
362       self.opcode.status = constants.OP_STATUS_RUNNING
363     finally:
364       self.queue.release()
365
366   def RunTask(self, job):
367     """Job executor.
368
369     This functions processes a job. It is closely tied to the _QueuedJob and
370     _QueuedOpCode classes.
371
372     @type job: L{_QueuedJob}
373     @param job: the job to be processed
374
375     """
376     logging.info("Worker %s processing job %s",
377                   self.worker_id, job.id)
378     proc = mcpu.Processor(self.pool.queue.context)
379     self.queue = queue = job.queue
380     try:
381       try:
382         count = len(job.ops)
383         for idx, op in enumerate(job.ops):
384           op_summary = op.input.Summary()
385           if op.status == constants.OP_STATUS_SUCCESS:
386             # this is a job that was partially completed before master
387             # daemon shutdown, so it can be expected that some opcodes
388             # are already completed successfully (if any did error
389             # out, then the whole job should have been aborted and not
390             # resubmitted for processing)
391             logging.info("Op %s/%s: opcode %s already processed, skipping",
392                          idx + 1, count, op_summary)
393             continue
394           try:
395             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
396                          op_summary)
397
398             queue.acquire()
399             try:
400               if op.status == constants.OP_STATUS_CANCELED:
401                 raise CancelJob()
402               assert op.status == constants.OP_STATUS_QUEUED
403               job.run_op_index = idx
404               op.status = constants.OP_STATUS_WAITLOCK
405               op.result = None
406               op.start_timestamp = TimeStampNow()
407               if idx == 0: # first opcode
408                 job.start_timestamp = op.start_timestamp
409               queue.UpdateJobUnlocked(job)
410
411               input_opcode = op.input
412             finally:
413               queue.release()
414
415             def _Log(*args):
416               """Append a log entry.
417
418               """
419               assert len(args) < 3
420
421               if len(args) == 1:
422                 log_type = constants.ELOG_MESSAGE
423                 log_msg = args[0]
424               else:
425                 log_type, log_msg = args
426
427               # The time is split to make serialization easier and not lose
428               # precision.
429               timestamp = utils.SplitTime(time.time())
430
431               queue.acquire()
432               try:
433                 job.log_serial += 1
434                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
435
436                 job.change.notifyAll()
437               finally:
438                 queue.release()
439
440             # Make sure not to hold lock while _Log is called
441             self.opcode = op
442             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
443
444             queue.acquire()
445             try:
446               op.status = constants.OP_STATUS_SUCCESS
447               op.result = result
448               op.end_timestamp = TimeStampNow()
449               queue.UpdateJobUnlocked(job)
450             finally:
451               queue.release()
452
453             logging.info("Op %s/%s: Successfully finished opcode %s",
454                          idx + 1, count, op_summary)
455           except CancelJob:
456             # Will be handled further up
457             raise
458           except Exception, err:
459             queue.acquire()
460             try:
461               try:
462                 op.status = constants.OP_STATUS_ERROR
463                 op.result = str(err)
464                 op.end_timestamp = TimeStampNow()
465                 logging.info("Op %s/%s: Error in opcode %s: %s",
466                              idx + 1, count, op_summary, err)
467               finally:
468                 queue.UpdateJobUnlocked(job)
469             finally:
470               queue.release()
471             raise
472
473       except CancelJob:
474         queue.acquire()
475         try:
476           queue.CancelJobUnlocked(job)
477         finally:
478           queue.release()
479       except errors.GenericError, err:
480         logging.exception("Ganeti exception")
481       except:
482         logging.exception("Unhandled exception")
483     finally:
484       queue.acquire()
485       try:
486         try:
487           job.run_op_index = -1
488           job.end_timestamp = TimeStampNow()
489           queue.UpdateJobUnlocked(job)
490         finally:
491           job_id = job.id
492           status = job.CalcStatus()
493       finally:
494         queue.release()
495       logging.info("Worker %s finished job %s, status = %s",
496                    self.worker_id, job_id, status)
497
498
499 class _JobQueueWorkerPool(workerpool.WorkerPool):
500   """Simple class implementing a job-processing workerpool.
501
502   """
503   def __init__(self, queue):
504     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
505                                               _JobQueueWorker)
506     self.queue = queue
507
508
509 class JobQueue(object):
510   """Queue used to manage the jobs.
511
512   @cvar _RE_JOB_FILE: regex matching the valid job file names
513
514   """
515   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
516
517   def _RequireOpenQueue(fn):
518     """Decorator for "public" functions.
519
520     This function should be used for all 'public' functions. That is,
521     functions usually called from other classes.
522
523     @warning: Use this decorator only after utils.LockedMethod!
524
525     Example::
526       @utils.LockedMethod
527       @_RequireOpenQueue
528       def Example(self):
529         pass
530
531     """
532     def wrapper(self, *args, **kwargs):
533       assert self._queue_lock is not None, "Queue should be open"
534       return fn(self, *args, **kwargs)
535     return wrapper
536
537   def __init__(self, context):
538     """Constructor for JobQueue.
539
540     The constructor will initialize the job queue object and then
541     start loading the current jobs from disk, either for starting them
542     (if they were queue) or for aborting them (if they were already
543     running).
544
545     @type context: GanetiContext
546     @param context: the context object for access to the configuration
547         data and other ganeti objects
548
549     """
550     self.context = context
551     self._memcache = weakref.WeakValueDictionary()
552     self._my_hostname = utils.HostInfo().name
553
554     # Locking
555     self._lock = threading.Lock()
556     self.acquire = self._lock.acquire
557     self.release = self._lock.release
558
559     # Initialize
560     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
561
562     # Read serial file
563     self._last_serial = jstore.ReadSerial()
564     assert self._last_serial is not None, ("Serial file was modified between"
565                                            " check in jstore and here")
566
567     # Get initial list of nodes
568     self._nodes = dict((n.name, n.primary_ip)
569                        for n in self.context.cfg.GetAllNodesInfo().values()
570                        if n.master_candidate)
571
572     # Remove master node
573     try:
574       del self._nodes[self._my_hostname]
575     except KeyError:
576       pass
577
578     # TODO: Check consistency across nodes
579
580     # Setup worker pool
581     self._wpool = _JobQueueWorkerPool(self)
582     try:
583       # We need to lock here because WorkerPool.AddTask() may start a job while
584       # we're still doing our work.
585       self.acquire()
586       try:
587         logging.info("Inspecting job queue")
588
589         all_job_ids = self._GetJobIDsUnlocked()
590         jobs_count = len(all_job_ids)
591         lastinfo = time.time()
592         for idx, job_id in enumerate(all_job_ids):
593           # Give an update every 1000 jobs or 10 seconds
594           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
595               idx == (jobs_count - 1)):
596             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
597                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
598             lastinfo = time.time()
599
600           job = self._LoadJobUnlocked(job_id)
601
602           # a failure in loading the job can cause 'None' to be returned
603           if job is None:
604             continue
605
606           status = job.CalcStatus()
607
608           if status in (constants.JOB_STATUS_QUEUED, ):
609             self._wpool.AddTask(job)
610
611           elif status in (constants.JOB_STATUS_RUNNING,
612                           constants.JOB_STATUS_WAITLOCK,
613                           constants.JOB_STATUS_CANCELING):
614             logging.warning("Unfinished job %s found: %s", job.id, job)
615             try:
616               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
617                                     "Unclean master daemon shutdown")
618             finally:
619               self.UpdateJobUnlocked(job)
620
621         logging.info("Job queue inspection finished")
622       finally:
623         self.release()
624     except:
625       self._wpool.TerminateWorkers()
626       raise
627
628   @utils.LockedMethod
629   @_RequireOpenQueue
630   def AddNode(self, node):
631     """Register a new node with the queue.
632
633     @type node: L{objects.Node}
634     @param node: the node object to be added
635
636     """
637     node_name = node.name
638     assert node_name != self._my_hostname
639
640     # Clean queue directory on added node
641     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
642     msg = result.RemoteFailMsg()
643     if msg:
644       logging.warning("Cannot cleanup queue directory on node %s: %s",
645                       node_name, msg)
646
647     if not node.master_candidate:
648       # remove if existing, ignoring errors
649       self._nodes.pop(node_name, None)
650       # and skip the replication of the job ids
651       return
652
653     # Upload the whole queue excluding archived jobs
654     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
655
656     # Upload current serial file
657     files.append(constants.JOB_QUEUE_SERIAL_FILE)
658
659     for file_name in files:
660       # Read file content
661       fd = open(file_name, "r")
662       try:
663         content = fd.read()
664       finally:
665         fd.close()
666
667       result = rpc.RpcRunner.call_jobqueue_update([node_name],
668                                                   [node.primary_ip],
669                                                   file_name, content)
670       msg = result[node_name].RemoteFailMsg()
671       if msg:
672         logging.error("Failed to upload file %s to node %s: %s",
673                       file_name, node_name, msg)
674
675     self._nodes[node_name] = node.primary_ip
676
677   @utils.LockedMethod
678   @_RequireOpenQueue
679   def RemoveNode(self, node_name):
680     """Callback called when removing nodes from the cluster.
681
682     @type node_name: str
683     @param node_name: the name of the node to remove
684
685     """
686     try:
687       # The queue is removed by the "leave node" RPC call.
688       del self._nodes[node_name]
689     except KeyError:
690       pass
691
692   def _CheckRpcResult(self, result, nodes, failmsg):
693     """Verifies the status of an RPC call.
694
695     Since we aim to keep consistency should this node (the current
696     master) fail, we will log errors if our rpc fail, and especially
697     log the case when more than half of the nodes fails.
698
699     @param result: the data as returned from the rpc call
700     @type nodes: list
701     @param nodes: the list of nodes we made the call to
702     @type failmsg: str
703     @param failmsg: the identifier to be used for logging
704
705     """
706     failed = []
707     success = []
708
709     for node in nodes:
710       msg = result[node].RemoteFailMsg()
711       if msg:
712         failed.append(node)
713         logging.error("RPC call %s failed on node %s: %s",
714                       result[node].call, node, msg)
715       else:
716         success.append(node)
717
718     # +1 for the master node
719     if (len(success) + 1) < len(failed):
720       # TODO: Handle failing nodes
721       logging.error("More than half of the nodes failed")
722
723   def _GetNodeIp(self):
724     """Helper for returning the node name/ip list.
725
726     @rtype: (list, list)
727     @return: a tuple of two lists, the first one with the node
728         names and the second one with the node addresses
729
730     """
731     name_list = self._nodes.keys()
732     addr_list = [self._nodes[name] for name in name_list]
733     return name_list, addr_list
734
735   def _WriteAndReplicateFileUnlocked(self, file_name, data):
736     """Writes a file locally and then replicates it to all nodes.
737
738     This function will replace the contents of a file on the local
739     node and then replicate it to all the other nodes we have.
740
741     @type file_name: str
742     @param file_name: the path of the file to be replicated
743     @type data: str
744     @param data: the new contents of the file
745
746     """
747     utils.WriteFile(file_name, data=data)
748
749     names, addrs = self._GetNodeIp()
750     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
751     self._CheckRpcResult(result, self._nodes,
752                          "Updating %s" % file_name)
753
754   def _RenameFilesUnlocked(self, rename):
755     """Renames a file locally and then replicate the change.
756
757     This function will rename a file in the local queue directory
758     and then replicate this rename to all the other nodes we have.
759
760     @type rename: list of (old, new)
761     @param rename: List containing tuples mapping old to new names
762
763     """
764     # Rename them locally
765     for old, new in rename:
766       utils.RenameFile(old, new, mkdir=True)
767
768     # ... and on all nodes
769     names, addrs = self._GetNodeIp()
770     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
771     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
772
773   def _FormatJobID(self, job_id):
774     """Convert a job ID to string format.
775
776     Currently this just does C{str(job_id)} after performing some
777     checks, but if we want to change the job id format this will
778     abstract this change.
779
780     @type job_id: int or long
781     @param job_id: the numeric job id
782     @rtype: str
783     @return: the formatted job id
784
785     """
786     if not isinstance(job_id, (int, long)):
787       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
788     if job_id < 0:
789       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
790
791     return str(job_id)
792
793   @classmethod
794   def _GetArchiveDirectory(cls, job_id):
795     """Returns the archive directory for a job.
796
797     @type job_id: str
798     @param job_id: Job identifier
799     @rtype: str
800     @return: Directory name
801
802     """
803     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
804
805   def _NewSerialUnlocked(self):
806     """Generates a new job identifier.
807
808     Job identifiers are unique during the lifetime of a cluster.
809
810     @rtype: str
811     @return: a string representing the job identifier.
812
813     """
814     # New number
815     serial = self._last_serial + 1
816
817     # Write to file
818     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
819                                         "%s\n" % serial)
820
821     # Keep it only if we were able to write the file
822     self._last_serial = serial
823
824     return self._FormatJobID(serial)
825
826   @staticmethod
827   def _GetJobPath(job_id):
828     """Returns the job file for a given job id.
829
830     @type job_id: str
831     @param job_id: the job identifier
832     @rtype: str
833     @return: the path to the job file
834
835     """
836     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
837
838   @classmethod
839   def _GetArchivedJobPath(cls, job_id):
840     """Returns the archived job file for a give job id.
841
842     @type job_id: str
843     @param job_id: the job identifier
844     @rtype: str
845     @return: the path to the archived job file
846
847     """
848     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
849     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
850
851   @classmethod
852   def _ExtractJobID(cls, name):
853     """Extract the job id from a filename.
854
855     @type name: str
856     @param name: the job filename
857     @rtype: job id or None
858     @return: the job id corresponding to the given filename,
859         or None if the filename does not represent a valid
860         job file
861
862     """
863     m = cls._RE_JOB_FILE.match(name)
864     if m:
865       return m.group(1)
866     else:
867       return None
868
869   def _GetJobIDsUnlocked(self, archived=False):
870     """Return all known job IDs.
871
872     If the parameter archived is True, archived jobs IDs will be
873     included. Currently this argument is unused.
874
875     The method only looks at disk because it's a requirement that all
876     jobs are present on disk (so in the _memcache we don't have any
877     extra IDs).
878
879     @rtype: list
880     @return: the list of job IDs
881
882     """
883     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
884     jlist = utils.NiceSort(jlist)
885     return jlist
886
887   def _ListJobFiles(self):
888     """Returns the list of current job files.
889
890     @rtype: list
891     @return: the list of job file names
892
893     """
894     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
895             if self._RE_JOB_FILE.match(name)]
896
897   def _LoadJobUnlocked(self, job_id):
898     """Loads a job from the disk or memory.
899
900     Given a job id, this will return the cached job object if
901     existing, or try to load the job from the disk. If loading from
902     disk, it will also add the job to the cache.
903
904     @param job_id: the job id
905     @rtype: L{_QueuedJob} or None
906     @return: either None or the job object
907
908     """
909     job = self._memcache.get(job_id, None)
910     if job:
911       logging.debug("Found job %s in memcache", job_id)
912       return job
913
914     filepath = self._GetJobPath(job_id)
915     logging.debug("Loading job from %s", filepath)
916     try:
917       fd = open(filepath, "r")
918     except IOError, err:
919       if err.errno in (errno.ENOENT, ):
920         return None
921       raise
922     try:
923       data = serializer.LoadJson(fd.read())
924     finally:
925       fd.close()
926
927     try:
928       job = _QueuedJob.Restore(self, data)
929     except Exception, err:
930       new_path = self._GetArchivedJobPath(job_id)
931       if filepath == new_path:
932         # job already archived (future case)
933         logging.exception("Can't parse job %s", job_id)
934       else:
935         # non-archived case
936         logging.exception("Can't parse job %s, will archive.", job_id)
937         self._RenameFilesUnlocked([(filepath, new_path)])
938       return None
939
940     self._memcache[job_id] = job
941     logging.debug("Added job %s to the cache", job_id)
942     return job
943
944   def _GetJobsUnlocked(self, job_ids):
945     """Return a list of jobs based on their IDs.
946
947     @type job_ids: list
948     @param job_ids: either an empty list (meaning all jobs),
949         or a list of job IDs
950     @rtype: list
951     @return: the list of job objects
952
953     """
954     if not job_ids:
955       job_ids = self._GetJobIDsUnlocked()
956
957     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
958
959   @staticmethod
960   def _IsQueueMarkedDrain():
961     """Check if the queue is marked from drain.
962
963     This currently uses the queue drain file, which makes it a
964     per-node flag. In the future this can be moved to the config file.
965
966     @rtype: boolean
967     @return: True of the job queue is marked for draining
968
969     """
970     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
971
972   @staticmethod
973   def SetDrainFlag(drain_flag):
974     """Sets the drain flag for the queue.
975
976     This is similar to the function L{backend.JobQueueSetDrainFlag},
977     and in the future we might merge them.
978
979     @type drain_flag: boolean
980     @param drain_flag: Whether to set or unset the drain flag
981
982     """
983     if drain_flag:
984       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
985     else:
986       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
987     return True
988
989   @_RequireOpenQueue
990   def _SubmitJobUnlocked(self, ops):
991     """Create and store a new job.
992
993     This enters the job into our job queue and also puts it on the new
994     queue, in order for it to be picked up by the queue processors.
995
996     @type ops: list
997     @param ops: The list of OpCodes that will become the new job.
998     @rtype: job ID
999     @return: the job ID of the newly created job
1000     @raise errors.JobQueueDrainError: if the job is marked for draining
1001
1002     """
1003     if self._IsQueueMarkedDrain():
1004       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1005
1006     # Check job queue size
1007     size = len(self._ListJobFiles())
1008     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1009       # TODO: Autoarchive jobs. Make sure it's not done on every job
1010       # submission, though.
1011       #size = ...
1012       pass
1013
1014     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1015       raise errors.JobQueueFull()
1016
1017     # Get job identifier
1018     job_id = self._NewSerialUnlocked()
1019     job = _QueuedJob(self, job_id, ops)
1020
1021     # Write to disk
1022     self.UpdateJobUnlocked(job)
1023
1024     logging.debug("Adding new job %s to the cache", job_id)
1025     self._memcache[job_id] = job
1026
1027     # Add to worker pool
1028     self._wpool.AddTask(job)
1029
1030     return job.id
1031
1032   @utils.LockedMethod
1033   @_RequireOpenQueue
1034   def SubmitJob(self, ops):
1035     """Create and store a new job.
1036
1037     @see: L{_SubmitJobUnlocked}
1038
1039     """
1040     return self._SubmitJobUnlocked(ops)
1041
1042   @utils.LockedMethod
1043   @_RequireOpenQueue
1044   def SubmitManyJobs(self, jobs):
1045     """Create and store multiple jobs.
1046
1047     @see: L{_SubmitJobUnlocked}
1048
1049     """
1050     results = []
1051     for ops in jobs:
1052       try:
1053         data = self._SubmitJobUnlocked(ops)
1054         status = True
1055       except errors.GenericError, err:
1056         data = str(err)
1057         status = False
1058       results.append((status, data))
1059
1060     return results
1061
1062
1063   @_RequireOpenQueue
1064   def UpdateJobUnlocked(self, job):
1065     """Update a job's on disk storage.
1066
1067     After a job has been modified, this function needs to be called in
1068     order to write the changes to disk and replicate them to the other
1069     nodes.
1070
1071     @type job: L{_QueuedJob}
1072     @param job: the changed job
1073
1074     """
1075     filename = self._GetJobPath(job.id)
1076     data = serializer.DumpJson(job.Serialize(), indent=False)
1077     logging.debug("Writing job %s to %s", job.id, filename)
1078     self._WriteAndReplicateFileUnlocked(filename, data)
1079
1080     # Notify waiters about potential changes
1081     job.change.notifyAll()
1082
1083   @utils.LockedMethod
1084   @_RequireOpenQueue
1085   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1086                         timeout):
1087     """Waits for changes in a job.
1088
1089     @type job_id: string
1090     @param job_id: Job identifier
1091     @type fields: list of strings
1092     @param fields: Which fields to check for changes
1093     @type prev_job_info: list or None
1094     @param prev_job_info: Last job information returned
1095     @type prev_log_serial: int
1096     @param prev_log_serial: Last job message serial number
1097     @type timeout: float
1098     @param timeout: maximum time to wait
1099     @rtype: tuple (job info, log entries)
1100     @return: a tuple of the job information as required via
1101         the fields parameter, and the log entries as a list
1102
1103         if the job has not changed and the timeout has expired,
1104         we instead return a special value,
1105         L{constants.JOB_NOTCHANGED}, which should be interpreted
1106         as such by the clients
1107
1108     """
1109     logging.debug("Waiting for changes in job %s", job_id)
1110     end_time = time.time() + timeout
1111     while True:
1112       delta_time = end_time - time.time()
1113       if delta_time < 0:
1114         return constants.JOB_NOTCHANGED
1115
1116       job = self._LoadJobUnlocked(job_id)
1117       if not job:
1118         logging.debug("Job %s not found", job_id)
1119         break
1120
1121       status = job.CalcStatus()
1122       job_info = self._GetJobInfoUnlocked(job, fields)
1123       log_entries = job.GetLogEntries(prev_log_serial)
1124
1125       # Serializing and deserializing data can cause type changes (e.g. from
1126       # tuple to list) or precision loss. We're doing it here so that we get
1127       # the same modifications as the data received from the client. Without
1128       # this, the comparison afterwards might fail without the data being
1129       # significantly different.
1130       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1131       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1132
1133       if status not in (constants.JOB_STATUS_QUEUED,
1134                         constants.JOB_STATUS_RUNNING,
1135                         constants.JOB_STATUS_WAITLOCK):
1136         # Don't even try to wait if the job is no longer running, there will be
1137         # no changes.
1138         break
1139
1140       if (prev_job_info != job_info or
1141           (log_entries and prev_log_serial != log_entries[0][0])):
1142         break
1143
1144       logging.debug("Waiting again")
1145
1146       # Release the queue lock while waiting
1147       job.change.wait(delta_time)
1148
1149     logging.debug("Job %s changed", job_id)
1150
1151     return (job_info, log_entries)
1152
1153   @utils.LockedMethod
1154   @_RequireOpenQueue
1155   def CancelJob(self, job_id):
1156     """Cancels a job.
1157
1158     This will only succeed if the job has not started yet.
1159
1160     @type job_id: string
1161     @param job_id: job ID of job to be cancelled.
1162
1163     """
1164     logging.info("Cancelling job %s", job_id)
1165
1166     job = self._LoadJobUnlocked(job_id)
1167     if not job:
1168       logging.debug("Job %s not found", job_id)
1169       return (False, "Job %s not found" % job_id)
1170
1171     job_status = job.CalcStatus()
1172
1173     if job_status not in (constants.JOB_STATUS_QUEUED,
1174                           constants.JOB_STATUS_WAITLOCK):
1175       logging.debug("Job %s is no longer in the queue", job.id)
1176       return (False, "Job %s is no longer in the queue" % job.id)
1177
1178     if job_status == constants.JOB_STATUS_QUEUED:
1179       self.CancelJobUnlocked(job)
1180       return (True, "Job %s canceled" % job.id)
1181
1182     elif job_status == constants.JOB_STATUS_WAITLOCK:
1183       # The worker will notice the new status and cancel the job
1184       try:
1185         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1186       finally:
1187         self.UpdateJobUnlocked(job)
1188       return (True, "Job %s will be canceled" % job.id)
1189
1190   @_RequireOpenQueue
1191   def CancelJobUnlocked(self, job):
1192     """Marks a job as canceled.
1193
1194     """
1195     try:
1196       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1197                             "Job canceled by request")
1198     finally:
1199       self.UpdateJobUnlocked(job)
1200
1201   @_RequireOpenQueue
1202   def _ArchiveJobsUnlocked(self, jobs):
1203     """Archives jobs.
1204
1205     @type jobs: list of L{_QueuedJob}
1206     @param jobs: Job objects
1207     @rtype: int
1208     @return: Number of archived jobs
1209
1210     """
1211     archive_jobs = []
1212     rename_files = []
1213     for job in jobs:
1214       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1215                                   constants.JOB_STATUS_SUCCESS,
1216                                   constants.JOB_STATUS_ERROR):
1217         logging.debug("Job %s is not yet done", job.id)
1218         continue
1219
1220       archive_jobs.append(job)
1221
1222       old = self._GetJobPath(job.id)
1223       new = self._GetArchivedJobPath(job.id)
1224       rename_files.append((old, new))
1225
1226     # TODO: What if 1..n files fail to rename?
1227     self._RenameFilesUnlocked(rename_files)
1228
1229     logging.debug("Successfully archived job(s) %s",
1230                   ", ".join(job.id for job in archive_jobs))
1231
1232     return len(archive_jobs)
1233
1234   @utils.LockedMethod
1235   @_RequireOpenQueue
1236   def ArchiveJob(self, job_id):
1237     """Archives a job.
1238
1239     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1240
1241     @type job_id: string
1242     @param job_id: Job ID of job to be archived.
1243     @rtype: bool
1244     @return: Whether job was archived
1245
1246     """
1247     logging.info("Archiving job %s", job_id)
1248
1249     job = self._LoadJobUnlocked(job_id)
1250     if not job:
1251       logging.debug("Job %s not found", job_id)
1252       return False
1253
1254     return self._ArchiveJobsUnlocked([job]) == 1
1255
1256   @utils.LockedMethod
1257   @_RequireOpenQueue
1258   def AutoArchiveJobs(self, age, timeout):
1259     """Archives all jobs based on age.
1260
1261     The method will archive all jobs which are older than the age
1262     parameter. For jobs that don't have an end timestamp, the start
1263     timestamp will be considered. The special '-1' age will cause
1264     archival of all jobs (that are not running or queued).
1265
1266     @type age: int
1267     @param age: the minimum age in seconds
1268
1269     """
1270     logging.info("Archiving jobs with age more than %s seconds", age)
1271
1272     now = time.time()
1273     end_time = now + timeout
1274     archived_count = 0
1275     last_touched = 0
1276
1277     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1278     pending = []
1279     for idx, job_id in enumerate(all_job_ids):
1280       last_touched = idx
1281
1282       # Not optimal because jobs could be pending
1283       # TODO: Measure average duration for job archival and take number of
1284       # pending jobs into account.
1285       if time.time() > end_time:
1286         break
1287
1288       # Returns None if the job failed to load
1289       job = self._LoadJobUnlocked(job_id)
1290       if job:
1291         if job.end_timestamp is None:
1292           if job.start_timestamp is None:
1293             job_age = job.received_timestamp
1294           else:
1295             job_age = job.start_timestamp
1296         else:
1297           job_age = job.end_timestamp
1298
1299         if age == -1 or now - job_age[0] > age:
1300           pending.append(job)
1301
1302           # Archive 10 jobs at a time
1303           if len(pending) >= 10:
1304             archived_count += self._ArchiveJobsUnlocked(pending)
1305             pending = []
1306
1307     if pending:
1308       archived_count += self._ArchiveJobsUnlocked(pending)
1309
1310     return (archived_count, len(all_job_ids) - last_touched - 1)
1311
1312   def _GetJobInfoUnlocked(self, job, fields):
1313     """Returns information about a job.
1314
1315     @type job: L{_QueuedJob}
1316     @param job: the job which we query
1317     @type fields: list
1318     @param fields: names of fields to return
1319     @rtype: list
1320     @return: list with one element for each field
1321     @raise errors.OpExecError: when an invalid field
1322         has been passed
1323
1324     """
1325     row = []
1326     for fname in fields:
1327       if fname == "id":
1328         row.append(job.id)
1329       elif fname == "status":
1330         row.append(job.CalcStatus())
1331       elif fname == "ops":
1332         row.append([op.input.__getstate__() for op in job.ops])
1333       elif fname == "opresult":
1334         row.append([op.result for op in job.ops])
1335       elif fname == "opstatus":
1336         row.append([op.status for op in job.ops])
1337       elif fname == "oplog":
1338         row.append([op.log for op in job.ops])
1339       elif fname == "opstart":
1340         row.append([op.start_timestamp for op in job.ops])
1341       elif fname == "opend":
1342         row.append([op.end_timestamp for op in job.ops])
1343       elif fname == "received_ts":
1344         row.append(job.received_timestamp)
1345       elif fname == "start_ts":
1346         row.append(job.start_timestamp)
1347       elif fname == "end_ts":
1348         row.append(job.end_timestamp)
1349       elif fname == "summary":
1350         row.append([op.input.Summary() for op in job.ops])
1351       else:
1352         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1353     return row
1354
1355   @utils.LockedMethod
1356   @_RequireOpenQueue
1357   def QueryJobs(self, job_ids, fields):
1358     """Returns a list of jobs in queue.
1359
1360     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1361     processing for each job.
1362
1363     @type job_ids: list
1364     @param job_ids: sequence of job identifiers or None for all
1365     @type fields: list
1366     @param fields: names of fields to return
1367     @rtype: list
1368     @return: list one element per job, each element being list with
1369         the requested fields
1370
1371     """
1372     jobs = []
1373
1374     for job in self._GetJobsUnlocked(job_ids):
1375       if job is None:
1376         jobs.append(None)
1377       else:
1378         jobs.append(self._GetJobInfoUnlocked(job, fields))
1379
1380     return jobs
1381
1382   @utils.LockedMethod
1383   @_RequireOpenQueue
1384   def Shutdown(self):
1385     """Stops the job queue.
1386
1387     This shutdowns all the worker threads an closes the queue.
1388
1389     """
1390     self._wpool.TerminateWorkers()
1391
1392     self._queue_lock.Close()
1393     self._queue_lock = None