QA: remove the --default-hypervisor option
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   __slots__ = ["input", "status", "result", "log",
84                "start_timestamp", "end_timestamp",
85                "__weakref__"]
86
87   def __init__(self, op):
88     """Constructor for the _QuededOpCode.
89
90     @type op: L{opcodes.OpCode}
91     @param op: the opcode we encapsulate
92
93     """
94     self.input = op
95     self.status = constants.OP_STATUS_QUEUED
96     self.result = None
97     self.log = []
98     self.start_timestamp = None
99     self.end_timestamp = None
100
101   @classmethod
102   def Restore(cls, state):
103     """Restore the _QueuedOpCode from the serialized form.
104
105     @type state: dict
106     @param state: the serialized state
107     @rtype: _QueuedOpCode
108     @return: a new _QueuedOpCode instance
109
110     """
111     obj = _QueuedOpCode.__new__(cls)
112     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113     obj.status = state["status"]
114     obj.result = state["result"]
115     obj.log = state["log"]
116     obj.start_timestamp = state.get("start_timestamp", None)
117     obj.end_timestamp = state.get("end_timestamp", None)
118     return obj
119
120   def Serialize(self):
121     """Serializes this _QueuedOpCode.
122
123     @rtype: dict
124     @return: the dictionary holding the serialized state
125
126     """
127     return {
128       "input": self.input.__getstate__(),
129       "status": self.status,
130       "result": self.result,
131       "log": self.log,
132       "start_timestamp": self.start_timestamp,
133       "end_timestamp": self.end_timestamp,
134       }
135
136
137 class _QueuedJob(object):
138   """In-memory job representation.
139
140   This is what we use to track the user-submitted jobs. Locking must
141   be taken care of by users of this class.
142
143   @type queue: L{JobQueue}
144   @ivar queue: the parent queue
145   @ivar id: the job ID
146   @type ops: list
147   @ivar ops: the list of _QueuedOpCode that constitute the job
148   @type run_op_index: int
149   @ivar run_op_index: the currently executing opcode, or -1 if
150       we didn't yet start executing
151   @type log_serial: int
152   @ivar log_serial: holds the index for the next log entry
153   @ivar received_timestamp: the timestamp for when the job was received
154   @ivar start_timestmap: the timestamp for start of execution
155   @ivar end_timestamp: the timestamp for end of execution
156   @ivar change: a Condition variable we use for waiting for job changes
157
158   """
159   __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160                "received_timestamp", "start_timestamp", "end_timestamp",
161                "change",
162                "__weakref__"]
163
164   def __init__(self, queue, job_id, ops):
165     """Constructor for the _QueuedJob.
166
167     @type queue: L{JobQueue}
168     @param queue: our parent queue
169     @type job_id: job_id
170     @param job_id: our job id
171     @type ops: list
172     @param ops: the list of opcodes we hold, which will be encapsulated
173         in _QueuedOpCodes
174
175     """
176     if not ops:
177       # TODO: use a better exception
178       raise Exception("No opcodes")
179
180     self.queue = queue
181     self.id = job_id
182     self.ops = [_QueuedOpCode(op) for op in ops]
183     self.run_op_index = -1
184     self.log_serial = 0
185     self.received_timestamp = TimeStampNow()
186     self.start_timestamp = None
187     self.end_timestamp = None
188
189     # Condition to wait for changes
190     self.change = threading.Condition(self.queue._lock)
191
192   @classmethod
193   def Restore(cls, queue, state):
194     """Restore a _QueuedJob from serialized state:
195
196     @type queue: L{JobQueue}
197     @param queue: to which queue the restored job belongs
198     @type state: dict
199     @param state: the serialized state
200     @rtype: _JobQueue
201     @return: the restored _JobQueue instance
202
203     """
204     obj = _QueuedJob.__new__(cls)
205     obj.queue = queue
206     obj.id = state["id"]
207     obj.run_op_index = state["run_op_index"]
208     obj.received_timestamp = state.get("received_timestamp", None)
209     obj.start_timestamp = state.get("start_timestamp", None)
210     obj.end_timestamp = state.get("end_timestamp", None)
211
212     obj.ops = []
213     obj.log_serial = 0
214     for op_state in state["ops"]:
215       op = _QueuedOpCode.Restore(op_state)
216       for log_entry in op.log:
217         obj.log_serial = max(obj.log_serial, log_entry[0])
218       obj.ops.append(op)
219
220     # Condition to wait for changes
221     obj.change = threading.Condition(obj.queue._lock)
222
223     return obj
224
225   def Serialize(self):
226     """Serialize the _JobQueue instance.
227
228     @rtype: dict
229     @return: the serialized state
230
231     """
232     return {
233       "id": self.id,
234       "ops": [op.Serialize() for op in self.ops],
235       "run_op_index": self.run_op_index,
236       "start_timestamp": self.start_timestamp,
237       "end_timestamp": self.end_timestamp,
238       "received_timestamp": self.received_timestamp,
239       }
240
241   def CalcStatus(self):
242     """Compute the status of this job.
243
244     This function iterates over all the _QueuedOpCodes in the job and
245     based on their status, computes the job status.
246
247     The algorithm is:
248       - if we find a cancelled, or finished with error, the job
249         status will be the same
250       - otherwise, the last opcode with the status one of:
251           - waitlock
252           - canceling
253           - running
254
255         will determine the job status
256
257       - otherwise, it means either all opcodes are queued, or success,
258         and the job status will be the same
259
260     @return: the job status
261
262     """
263     status = constants.JOB_STATUS_QUEUED
264
265     all_success = True
266     for op in self.ops:
267       if op.status == constants.OP_STATUS_SUCCESS:
268         continue
269
270       all_success = False
271
272       if op.status == constants.OP_STATUS_QUEUED:
273         pass
274       elif op.status == constants.OP_STATUS_WAITLOCK:
275         status = constants.JOB_STATUS_WAITLOCK
276       elif op.status == constants.OP_STATUS_RUNNING:
277         status = constants.JOB_STATUS_RUNNING
278       elif op.status == constants.OP_STATUS_CANCELING:
279         status = constants.JOB_STATUS_CANCELING
280         break
281       elif op.status == constants.OP_STATUS_ERROR:
282         status = constants.JOB_STATUS_ERROR
283         # The whole job fails if one opcode failed
284         break
285       elif op.status == constants.OP_STATUS_CANCELED:
286         status = constants.OP_STATUS_CANCELED
287         break
288
289     if all_success:
290       status = constants.JOB_STATUS_SUCCESS
291
292     return status
293
294   def GetLogEntries(self, newer_than):
295     """Selectively returns the log entries.
296
297     @type newer_than: None or int
298     @param newer_than: if this is None, return all log entries,
299         otherwise return only the log entries with serial higher
300         than this value
301     @rtype: list
302     @return: the list of the log entries selected
303
304     """
305     if newer_than is None:
306       serial = -1
307     else:
308       serial = newer_than
309
310     entries = []
311     for op in self.ops:
312       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313
314     return entries
315
316   def MarkUnfinishedOps(self, status, result):
317     """Mark unfinished opcodes with a given status and result.
318
319     This is an utility function for marking all running or waiting to
320     be run opcodes with a given status. Opcodes which are already
321     finalised are not changed.
322
323     @param status: a given opcode status
324     @param result: the opcode result
325
326     """
327     not_marked = True
328     for op in self.ops:
329       if op.status in constants.OPS_FINALIZED:
330         assert not_marked, "Finalized opcodes found after non-finalized ones"
331         continue
332       op.status = status
333       op.result = result
334       not_marked = False
335
336
337 class _JobQueueWorker(workerpool.BaseWorker):
338   """The actual job workers.
339
340   """
341   def _NotifyStart(self):
342     """Mark the opcode as running, not lock-waiting.
343
344     This is called from the mcpu code as a notifier function, when the
345     LU is finally about to start the Exec() method. Of course, to have
346     end-user visible results, the opcode must be initially (before
347     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
348
349     """
350     assert self.queue, "Queue attribute is missing"
351     assert self.opcode, "Opcode attribute is missing"
352
353     self.queue.acquire()
354     try:
355       assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
356                                     constants.OP_STATUS_CANCELING)
357
358       # Cancel here if we were asked to
359       if self.opcode.status == constants.OP_STATUS_CANCELING:
360         raise CancelJob()
361
362       self.opcode.status = constants.OP_STATUS_RUNNING
363     finally:
364       self.queue.release()
365
366   def RunTask(self, job):
367     """Job executor.
368
369     This functions processes a job. It is closely tied to the _QueuedJob and
370     _QueuedOpCode classes.
371
372     @type job: L{_QueuedJob}
373     @param job: the job to be processed
374
375     """
376     logging.info("Worker %s processing job %s",
377                   self.worker_id, job.id)
378     proc = mcpu.Processor(self.pool.queue.context)
379     self.queue = queue = job.queue
380     try:
381       try:
382         count = len(job.ops)
383         for idx, op in enumerate(job.ops):
384           op_summary = op.input.Summary()
385           if op.status == constants.OP_STATUS_SUCCESS:
386             # this is a job that was partially completed before master
387             # daemon shutdown, so it can be expected that some opcodes
388             # are already completed successfully (if any did error
389             # out, then the whole job should have been aborted and not
390             # resubmitted for processing)
391             logging.info("Op %s/%s: opcode %s already processed, skipping",
392                          idx + 1, count, op_summary)
393             continue
394           try:
395             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
396                          op_summary)
397
398             queue.acquire()
399             try:
400               if op.status == constants.OP_STATUS_CANCELED:
401                 raise CancelJob()
402               assert op.status == constants.OP_STATUS_QUEUED
403               job.run_op_index = idx
404               op.status = constants.OP_STATUS_WAITLOCK
405               op.result = None
406               op.start_timestamp = TimeStampNow()
407               if idx == 0: # first opcode
408                 job.start_timestamp = op.start_timestamp
409               queue.UpdateJobUnlocked(job)
410
411               input_opcode = op.input
412             finally:
413               queue.release()
414
415             def _Log(*args):
416               """Append a log entry.
417
418               """
419               assert len(args) < 3
420
421               if len(args) == 1:
422                 log_type = constants.ELOG_MESSAGE
423                 log_msg = args[0]
424               else:
425                 log_type, log_msg = args
426
427               # The time is split to make serialization easier and not lose
428               # precision.
429               timestamp = utils.SplitTime(time.time())
430
431               queue.acquire()
432               try:
433                 job.log_serial += 1
434                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
435
436                 job.change.notifyAll()
437               finally:
438                 queue.release()
439
440             # Make sure not to hold lock while _Log is called
441             self.opcode = op
442             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
443
444             queue.acquire()
445             try:
446               op.status = constants.OP_STATUS_SUCCESS
447               op.result = result
448               op.end_timestamp = TimeStampNow()
449               queue.UpdateJobUnlocked(job)
450             finally:
451               queue.release()
452
453             logging.info("Op %s/%s: Successfully finished opcode %s",
454                          idx + 1, count, op_summary)
455           except CancelJob:
456             # Will be handled further up
457             raise
458           except Exception, err:
459             queue.acquire()
460             try:
461               try:
462                 op.status = constants.OP_STATUS_ERROR
463                 op.result = str(err)
464                 op.end_timestamp = TimeStampNow()
465                 logging.info("Op %s/%s: Error in opcode %s: %s",
466                              idx + 1, count, op_summary, err)
467               finally:
468                 queue.UpdateJobUnlocked(job)
469             finally:
470               queue.release()
471             raise
472
473       except CancelJob:
474         queue.acquire()
475         try:
476           queue.CancelJobUnlocked(job)
477         finally:
478           queue.release()
479       except errors.GenericError, err:
480         logging.exception("Ganeti exception")
481       except:
482         logging.exception("Unhandled exception")
483     finally:
484       queue.acquire()
485       try:
486         try:
487           job.run_op_index = -1
488           job.end_timestamp = TimeStampNow()
489           queue.UpdateJobUnlocked(job)
490         finally:
491           job_id = job.id
492           status = job.CalcStatus()
493       finally:
494         queue.release()
495       logging.info("Worker %s finished job %s, status = %s",
496                    self.worker_id, job_id, status)
497
498
499 class _JobQueueWorkerPool(workerpool.WorkerPool):
500   """Simple class implementing a job-processing workerpool.
501
502   """
503   def __init__(self, queue):
504     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
505                                               _JobQueueWorker)
506     self.queue = queue
507
508
509 class JobQueue(object):
510   """Queue used to manage the jobs.
511
512   @cvar _RE_JOB_FILE: regex matching the valid job file names
513
514   """
515   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
516
517   def _RequireOpenQueue(fn):
518     """Decorator for "public" functions.
519
520     This function should be used for all 'public' functions. That is,
521     functions usually called from other classes.
522
523     @warning: Use this decorator only after utils.LockedMethod!
524
525     Example::
526       @utils.LockedMethod
527       @_RequireOpenQueue
528       def Example(self):
529         pass
530
531     """
532     def wrapper(self, *args, **kwargs):
533       assert self._queue_lock is not None, "Queue should be open"
534       return fn(self, *args, **kwargs)
535     return wrapper
536
537   def __init__(self, context):
538     """Constructor for JobQueue.
539
540     The constructor will initialize the job queue object and then
541     start loading the current jobs from disk, either for starting them
542     (if they were queue) or for aborting them (if they were already
543     running).
544
545     @type context: GanetiContext
546     @param context: the context object for access to the configuration
547         data and other ganeti objects
548
549     """
550     self.context = context
551     self._memcache = weakref.WeakValueDictionary()
552     self._my_hostname = utils.HostInfo().name
553
554     # Locking
555     self._lock = threading.Lock()
556     self.acquire = self._lock.acquire
557     self.release = self._lock.release
558
559     # Initialize
560     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
561
562     # Read serial file
563     self._last_serial = jstore.ReadSerial()
564     assert self._last_serial is not None, ("Serial file was modified between"
565                                            " check in jstore and here")
566
567     # Get initial list of nodes
568     self._nodes = dict((n.name, n.primary_ip)
569                        for n in self.context.cfg.GetAllNodesInfo().values()
570                        if n.master_candidate)
571
572     # Remove master node
573     try:
574       del self._nodes[self._my_hostname]
575     except KeyError:
576       pass
577
578     # TODO: Check consistency across nodes
579
580     # Setup worker pool
581     self._wpool = _JobQueueWorkerPool(self)
582     try:
583       # We need to lock here because WorkerPool.AddTask() may start a job while
584       # we're still doing our work.
585       self.acquire()
586       try:
587         logging.info("Inspecting job queue")
588
589         all_job_ids = self._GetJobIDsUnlocked()
590         jobs_count = len(all_job_ids)
591         lastinfo = time.time()
592         for idx, job_id in enumerate(all_job_ids):
593           # Give an update every 1000 jobs or 10 seconds
594           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
595               idx == (jobs_count - 1)):
596             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
597                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
598             lastinfo = time.time()
599
600           job = self._LoadJobUnlocked(job_id)
601
602           # a failure in loading the job can cause 'None' to be returned
603           if job is None:
604             continue
605
606           status = job.CalcStatus()
607
608           if status in (constants.JOB_STATUS_QUEUED, ):
609             self._wpool.AddTask(job)
610
611           elif status in (constants.JOB_STATUS_RUNNING,
612                           constants.JOB_STATUS_WAITLOCK,
613                           constants.JOB_STATUS_CANCELING):
614             logging.warning("Unfinished job %s found: %s", job.id, job)
615             try:
616               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
617                                     "Unclean master daemon shutdown")
618             finally:
619               self.UpdateJobUnlocked(job)
620
621         logging.info("Job queue inspection finished")
622       finally:
623         self.release()
624     except:
625       self._wpool.TerminateWorkers()
626       raise
627
628   @utils.LockedMethod
629   @_RequireOpenQueue
630   def AddNode(self, node):
631     """Register a new node with the queue.
632
633     @type node: L{objects.Node}
634     @param node: the node object to be added
635
636     """
637     node_name = node.name
638     assert node_name != self._my_hostname
639
640     # Clean queue directory on added node
641     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
642     msg = result.RemoteFailMsg()
643     if msg:
644       logging.warning("Cannot cleanup queue directory on node %s: %s",
645                       node_name, msg)
646
647     if not node.master_candidate:
648       # remove if existing, ignoring errors
649       self._nodes.pop(node_name, None)
650       # and skip the replication of the job ids
651       return
652
653     # Upload the whole queue excluding archived jobs
654     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
655
656     # Upload current serial file
657     files.append(constants.JOB_QUEUE_SERIAL_FILE)
658
659     for file_name in files:
660       # Read file content
661       fd = open(file_name, "r")
662       try:
663         content = fd.read()
664       finally:
665         fd.close()
666
667       result = rpc.RpcRunner.call_jobqueue_update([node_name],
668                                                   [node.primary_ip],
669                                                   file_name, content)
670       msg = result[node_name].RemoteFailMsg()
671       if msg:
672         logging.error("Failed to upload file %s to node %s: %s",
673                       file_name, node_name, msg)
674
675     self._nodes[node_name] = node.primary_ip
676
677   @utils.LockedMethod
678   @_RequireOpenQueue
679   def RemoveNode(self, node_name):
680     """Callback called when removing nodes from the cluster.
681
682     @type node_name: str
683     @param node_name: the name of the node to remove
684
685     """
686     try:
687       # The queue is removed by the "leave node" RPC call.
688       del self._nodes[node_name]
689     except KeyError:
690       pass
691
692   def _CheckRpcResult(self, result, nodes, failmsg):
693     """Verifies the status of an RPC call.
694
695     Since we aim to keep consistency should this node (the current
696     master) fail, we will log errors if our rpc fail, and especially
697     log the case when more than half of the nodes fails.
698
699     @param result: the data as returned from the rpc call
700     @type nodes: list
701     @param nodes: the list of nodes we made the call to
702     @type failmsg: str
703     @param failmsg: the identifier to be used for logging
704
705     """
706     failed = []
707     success = []
708
709     for node in nodes:
710       msg = result[node].RemoteFailMsg()
711       if msg:
712         failed.append(node)
713         logging.error("RPC call %s failed on node %s: %s",
714                       result[node].call, node, msg)
715       else:
716         success.append(node)
717
718     # +1 for the master node
719     if (len(success) + 1) < len(failed):
720       # TODO: Handle failing nodes
721       logging.error("More than half of the nodes failed")
722
723   def _GetNodeIp(self):
724     """Helper for returning the node name/ip list.
725
726     @rtype: (list, list)
727     @return: a tuple of two lists, the first one with the node
728         names and the second one with the node addresses
729
730     """
731     name_list = self._nodes.keys()
732     addr_list = [self._nodes[name] for name in name_list]
733     return name_list, addr_list
734
735   def _WriteAndReplicateFileUnlocked(self, file_name, data):
736     """Writes a file locally and then replicates it to all nodes.
737
738     This function will replace the contents of a file on the local
739     node and then replicate it to all the other nodes we have.
740
741     @type file_name: str
742     @param file_name: the path of the file to be replicated
743     @type data: str
744     @param data: the new contents of the file
745
746     """
747     utils.WriteFile(file_name, data=data)
748
749     names, addrs = self._GetNodeIp()
750     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
751     self._CheckRpcResult(result, self._nodes,
752                          "Updating %s" % file_name)
753
754   def _RenameFilesUnlocked(self, rename):
755     """Renames a file locally and then replicate the change.
756
757     This function will rename a file in the local queue directory
758     and then replicate this rename to all the other nodes we have.
759
760     @type rename: list of (old, new)
761     @param rename: List containing tuples mapping old to new names
762
763     """
764     # Rename them locally
765     for old, new in rename:
766       utils.RenameFile(old, new, mkdir=True)
767
768     # ... and on all nodes
769     names, addrs = self._GetNodeIp()
770     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
771     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
772
773   def _FormatJobID(self, job_id):
774     """Convert a job ID to string format.
775
776     Currently this just does C{str(job_id)} after performing some
777     checks, but if we want to change the job id format this will
778     abstract this change.
779
780     @type job_id: int or long
781     @param job_id: the numeric job id
782     @rtype: str
783     @return: the formatted job id
784
785     """
786     if not isinstance(job_id, (int, long)):
787       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
788     if job_id < 0:
789       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
790
791     return str(job_id)
792
793   @classmethod
794   def _GetArchiveDirectory(cls, job_id):
795     """Returns the archive directory for a job.
796
797     @type job_id: str
798     @param job_id: Job identifier
799     @rtype: str
800     @return: Directory name
801
802     """
803     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
804
805   def _NewSerialUnlocked(self):
806     """Generates a new job identifier.
807
808     Job identifiers are unique during the lifetime of a cluster.
809
810     @rtype: str
811     @return: a string representing the job identifier.
812
813     """
814     # New number
815     serial = self._last_serial + 1
816
817     # Write to file
818     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
819                                         "%s\n" % serial)
820
821     # Keep it only if we were able to write the file
822     self._last_serial = serial
823
824     return self._FormatJobID(serial)
825
826   @staticmethod
827   def _GetJobPath(job_id):
828     """Returns the job file for a given job id.
829
830     @type job_id: str
831     @param job_id: the job identifier
832     @rtype: str
833     @return: the path to the job file
834
835     """
836     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
837
838   @classmethod
839   def _GetArchivedJobPath(cls, job_id):
840     """Returns the archived job file for a give job id.
841
842     @type job_id: str
843     @param job_id: the job identifier
844     @rtype: str
845     @return: the path to the archived job file
846
847     """
848     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
849     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
850
851   @classmethod
852   def _ExtractJobID(cls, name):
853     """Extract the job id from a filename.
854
855     @type name: str
856     @param name: the job filename
857     @rtype: job id or None
858     @return: the job id corresponding to the given filename,
859         or None if the filename does not represent a valid
860         job file
861
862     """
863     m = cls._RE_JOB_FILE.match(name)
864     if m:
865       return m.group(1)
866     else:
867       return None
868
869   def _GetJobIDsUnlocked(self, archived=False):
870     """Return all known job IDs.
871
872     If the parameter archived is True, archived jobs IDs will be
873     included. Currently this argument is unused.
874
875     The method only looks at disk because it's a requirement that all
876     jobs are present on disk (so in the _memcache we don't have any
877     extra IDs).
878
879     @rtype: list
880     @return: the list of job IDs
881
882     """
883     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
884     jlist = utils.NiceSort(jlist)
885     return jlist
886
887   def _ListJobFiles(self):
888     """Returns the list of current job files.
889
890     @rtype: list
891     @return: the list of job file names
892
893     """
894     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
895             if self._RE_JOB_FILE.match(name)]
896
897   def _LoadJobUnlocked(self, job_id):
898     """Loads a job from the disk or memory.
899
900     Given a job id, this will return the cached job object if
901     existing, or try to load the job from the disk. If loading from
902     disk, it will also add the job to the cache.
903
904     @param job_id: the job id
905     @rtype: L{_QueuedJob} or None
906     @return: either None or the job object
907
908     """
909     job = self._memcache.get(job_id, None)
910     if job:
911       logging.debug("Found job %s in memcache", job_id)
912       return job
913
914     filepath = self._GetJobPath(job_id)
915     logging.debug("Loading job from %s", filepath)
916     try:
917       fd = open(filepath, "r")
918     except IOError, err:
919       if err.errno in (errno.ENOENT, ):
920         return None
921       raise
922     try:
923       data = serializer.LoadJson(fd.read())
924     finally:
925       fd.close()
926
927     try:
928       job = _QueuedJob.Restore(self, data)
929     except Exception, err:
930       new_path = self._GetArchivedJobPath(job_id)
931       if filepath == new_path:
932         # job already archived (future case)
933         logging.exception("Can't parse job %s", job_id)
934       else:
935         # non-archived case
936         logging.exception("Can't parse job %s, will archive.", job_id)
937         self._RenameFilesUnlocked([(filepath, new_path)])
938       return None
939
940     self._memcache[job_id] = job
941     logging.debug("Added job %s to the cache", job_id)
942     return job
943
944   def _GetJobsUnlocked(self, job_ids):
945     """Return a list of jobs based on their IDs.
946
947     @type job_ids: list
948     @param job_ids: either an empty list (meaning all jobs),
949         or a list of job IDs
950     @rtype: list
951     @return: the list of job objects
952
953     """
954     if not job_ids:
955       job_ids = self._GetJobIDsUnlocked()
956
957     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
958
959   @staticmethod
960   def _IsQueueMarkedDrain():
961     """Check if the queue is marked from drain.
962
963     This currently uses the queue drain file, which makes it a
964     per-node flag. In the future this can be moved to the config file.
965
966     @rtype: boolean
967     @return: True of the job queue is marked for draining
968
969     """
970     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
971
972   @staticmethod
973   def SetDrainFlag(drain_flag):
974     """Sets the drain flag for the queue.
975
976     This is similar to the function L{backend.JobQueueSetDrainFlag},
977     and in the future we might merge them.
978
979     @type drain_flag: boolean
980     @param drain_flag: Whether to set or unset the drain flag
981
982     """
983     if drain_flag:
984       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
985     else:
986       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
987     return True
988
989   @_RequireOpenQueue
990   def _SubmitJobUnlocked(self, ops):
991     """Create and store a new job.
992
993     This enters the job into our job queue and also puts it on the new
994     queue, in order for it to be picked up by the queue processors.
995
996     @type ops: list
997     @param ops: The list of OpCodes that will become the new job.
998     @rtype: job ID
999     @return: the job ID of the newly created job
1000     @raise errors.JobQueueDrainError: if the job is marked for draining
1001
1002     """
1003     if self._IsQueueMarkedDrain():
1004       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1005
1006     # Check job queue size
1007     size = len(self._ListJobFiles())
1008     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1009       # TODO: Autoarchive jobs. Make sure it's not done on every job
1010       # submission, though.
1011       #size = ...
1012       pass
1013
1014     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1015       raise errors.JobQueueFull()
1016
1017     # Get job identifier
1018     job_id = self._NewSerialUnlocked()
1019     job = _QueuedJob(self, job_id, ops)
1020
1021     # Write to disk
1022     self.UpdateJobUnlocked(job)
1023
1024     logging.debug("Adding new job %s to the cache", job_id)
1025     self._memcache[job_id] = job
1026
1027     # Add to worker pool
1028     self._wpool.AddTask(job)
1029
1030     return job.id
1031
1032   @utils.LockedMethod
1033   @_RequireOpenQueue
1034   def SubmitJob(self, ops):
1035     """Create and store a new job.
1036
1037     @see: L{_SubmitJobUnlocked}
1038
1039     """
1040     return self._SubmitJobUnlocked(ops)
1041
1042   @utils.LockedMethod
1043   @_RequireOpenQueue
1044   def SubmitManyJobs(self, jobs):
1045     """Create and store multiple jobs.
1046
1047     @see: L{_SubmitJobUnlocked}
1048
1049     """
1050     results = []
1051     for ops in jobs:
1052       try:
1053         data = self._SubmitJobUnlocked(ops)
1054         status = True
1055       except errors.GenericError, err:
1056         data = str(err)
1057         status = False
1058       results.append((status, data))
1059
1060     return results
1061
1062
1063   @_RequireOpenQueue
1064   def UpdateJobUnlocked(self, job):
1065     """Update a job's on disk storage.
1066
1067     After a job has been modified, this function needs to be called in
1068     order to write the changes to disk and replicate them to the other
1069     nodes.
1070
1071     @type job: L{_QueuedJob}
1072     @param job: the changed job
1073
1074     """
1075     filename = self._GetJobPath(job.id)
1076     data = serializer.DumpJson(job.Serialize(), indent=False)
1077     logging.debug("Writing job %s to %s", job.id, filename)
1078     self._WriteAndReplicateFileUnlocked(filename, data)
1079
1080     # Notify waiters about potential changes
1081     job.change.notifyAll()
1082
1083   @utils.LockedMethod
1084   @_RequireOpenQueue
1085   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1086                         timeout):
1087     """Waits for changes in a job.
1088
1089     @type job_id: string
1090     @param job_id: Job identifier
1091     @type fields: list of strings
1092     @param fields: Which fields to check for changes
1093     @type prev_job_info: list or None
1094     @param prev_job_info: Last job information returned
1095     @type prev_log_serial: int
1096     @param prev_log_serial: Last job message serial number
1097     @type timeout: float
1098     @param timeout: maximum time to wait
1099     @rtype: tuple (job info, log entries)
1100     @return: a tuple of the job information as required via
1101         the fields parameter, and the log entries as a list
1102
1103         if the job has not changed and the timeout has expired,
1104         we instead return a special value,
1105         L{constants.JOB_NOTCHANGED}, which should be interpreted
1106         as such by the clients
1107
1108     """
1109     logging.debug("Waiting for changes in job %s", job_id)
1110
1111     job_info = None
1112     log_entries = None
1113
1114     end_time = time.time() + timeout
1115     while True:
1116       delta_time = end_time - time.time()
1117       if delta_time < 0:
1118         return constants.JOB_NOTCHANGED
1119
1120       job = self._LoadJobUnlocked(job_id)
1121       if not job:
1122         logging.debug("Job %s not found", job_id)
1123         break
1124
1125       status = job.CalcStatus()
1126       job_info = self._GetJobInfoUnlocked(job, fields)
1127       log_entries = job.GetLogEntries(prev_log_serial)
1128
1129       # Serializing and deserializing data can cause type changes (e.g. from
1130       # tuple to list) or precision loss. We're doing it here so that we get
1131       # the same modifications as the data received from the client. Without
1132       # this, the comparison afterwards might fail without the data being
1133       # significantly different.
1134       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1135       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1136
1137       if status not in (constants.JOB_STATUS_QUEUED,
1138                         constants.JOB_STATUS_RUNNING,
1139                         constants.JOB_STATUS_WAITLOCK):
1140         # Don't even try to wait if the job is no longer running, there will be
1141         # no changes.
1142         break
1143
1144       if (prev_job_info != job_info or
1145           (log_entries and prev_log_serial != log_entries[0][0])):
1146         break
1147
1148       logging.debug("Waiting again")
1149
1150       # Release the queue lock while waiting
1151       job.change.wait(delta_time)
1152
1153     logging.debug("Job %s changed", job_id)
1154
1155     if job_info is None and log_entries is None:
1156       return None
1157     else:
1158       return (job_info, log_entries)
1159
1160   @utils.LockedMethod
1161   @_RequireOpenQueue
1162   def CancelJob(self, job_id):
1163     """Cancels a job.
1164
1165     This will only succeed if the job has not started yet.
1166
1167     @type job_id: string
1168     @param job_id: job ID of job to be cancelled.
1169
1170     """
1171     logging.info("Cancelling job %s", job_id)
1172
1173     job = self._LoadJobUnlocked(job_id)
1174     if not job:
1175       logging.debug("Job %s not found", job_id)
1176       return (False, "Job %s not found" % job_id)
1177
1178     job_status = job.CalcStatus()
1179
1180     if job_status not in (constants.JOB_STATUS_QUEUED,
1181                           constants.JOB_STATUS_WAITLOCK):
1182       logging.debug("Job %s is no longer waiting in the queue", job.id)
1183       return (False, "Job %s is no longer waiting in the queue" % job.id)
1184
1185     if job_status == constants.JOB_STATUS_QUEUED:
1186       self.CancelJobUnlocked(job)
1187       return (True, "Job %s canceled" % job.id)
1188
1189     elif job_status == constants.JOB_STATUS_WAITLOCK:
1190       # The worker will notice the new status and cancel the job
1191       try:
1192         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1193       finally:
1194         self.UpdateJobUnlocked(job)
1195       return (True, "Job %s will be canceled" % job.id)
1196
1197   @_RequireOpenQueue
1198   def CancelJobUnlocked(self, job):
1199     """Marks a job as canceled.
1200
1201     """
1202     try:
1203       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1204                             "Job canceled by request")
1205     finally:
1206       self.UpdateJobUnlocked(job)
1207
1208   @_RequireOpenQueue
1209   def _ArchiveJobsUnlocked(self, jobs):
1210     """Archives jobs.
1211
1212     @type jobs: list of L{_QueuedJob}
1213     @param jobs: Job objects
1214     @rtype: int
1215     @return: Number of archived jobs
1216
1217     """
1218     archive_jobs = []
1219     rename_files = []
1220     for job in jobs:
1221       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1222                                   constants.JOB_STATUS_SUCCESS,
1223                                   constants.JOB_STATUS_ERROR):
1224         logging.debug("Job %s is not yet done", job.id)
1225         continue
1226
1227       archive_jobs.append(job)
1228
1229       old = self._GetJobPath(job.id)
1230       new = self._GetArchivedJobPath(job.id)
1231       rename_files.append((old, new))
1232
1233     # TODO: What if 1..n files fail to rename?
1234     self._RenameFilesUnlocked(rename_files)
1235
1236     logging.debug("Successfully archived job(s) %s",
1237                   ", ".join(job.id for job in archive_jobs))
1238
1239     return len(archive_jobs)
1240
1241   @utils.LockedMethod
1242   @_RequireOpenQueue
1243   def ArchiveJob(self, job_id):
1244     """Archives a job.
1245
1246     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1247
1248     @type job_id: string
1249     @param job_id: Job ID of job to be archived.
1250     @rtype: bool
1251     @return: Whether job was archived
1252
1253     """
1254     logging.info("Archiving job %s", job_id)
1255
1256     job = self._LoadJobUnlocked(job_id)
1257     if not job:
1258       logging.debug("Job %s not found", job_id)
1259       return False
1260
1261     return self._ArchiveJobsUnlocked([job]) == 1
1262
1263   @utils.LockedMethod
1264   @_RequireOpenQueue
1265   def AutoArchiveJobs(self, age, timeout):
1266     """Archives all jobs based on age.
1267
1268     The method will archive all jobs which are older than the age
1269     parameter. For jobs that don't have an end timestamp, the start
1270     timestamp will be considered. The special '-1' age will cause
1271     archival of all jobs (that are not running or queued).
1272
1273     @type age: int
1274     @param age: the minimum age in seconds
1275
1276     """
1277     logging.info("Archiving jobs with age more than %s seconds", age)
1278
1279     now = time.time()
1280     end_time = now + timeout
1281     archived_count = 0
1282     last_touched = 0
1283
1284     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1285     pending = []
1286     for idx, job_id in enumerate(all_job_ids):
1287       last_touched = idx
1288
1289       # Not optimal because jobs could be pending
1290       # TODO: Measure average duration for job archival and take number of
1291       # pending jobs into account.
1292       if time.time() > end_time:
1293         break
1294
1295       # Returns None if the job failed to load
1296       job = self._LoadJobUnlocked(job_id)
1297       if job:
1298         if job.end_timestamp is None:
1299           if job.start_timestamp is None:
1300             job_age = job.received_timestamp
1301           else:
1302             job_age = job.start_timestamp
1303         else:
1304           job_age = job.end_timestamp
1305
1306         if age == -1 or now - job_age[0] > age:
1307           pending.append(job)
1308
1309           # Archive 10 jobs at a time
1310           if len(pending) >= 10:
1311             archived_count += self._ArchiveJobsUnlocked(pending)
1312             pending = []
1313
1314     if pending:
1315       archived_count += self._ArchiveJobsUnlocked(pending)
1316
1317     return (archived_count, len(all_job_ids) - last_touched - 1)
1318
1319   def _GetJobInfoUnlocked(self, job, fields):
1320     """Returns information about a job.
1321
1322     @type job: L{_QueuedJob}
1323     @param job: the job which we query
1324     @type fields: list
1325     @param fields: names of fields to return
1326     @rtype: list
1327     @return: list with one element for each field
1328     @raise errors.OpExecError: when an invalid field
1329         has been passed
1330
1331     """
1332     row = []
1333     for fname in fields:
1334       if fname == "id":
1335         row.append(job.id)
1336       elif fname == "status":
1337         row.append(job.CalcStatus())
1338       elif fname == "ops":
1339         row.append([op.input.__getstate__() for op in job.ops])
1340       elif fname == "opresult":
1341         row.append([op.result for op in job.ops])
1342       elif fname == "opstatus":
1343         row.append([op.status for op in job.ops])
1344       elif fname == "oplog":
1345         row.append([op.log for op in job.ops])
1346       elif fname == "opstart":
1347         row.append([op.start_timestamp for op in job.ops])
1348       elif fname == "opend":
1349         row.append([op.end_timestamp for op in job.ops])
1350       elif fname == "received_ts":
1351         row.append(job.received_timestamp)
1352       elif fname == "start_ts":
1353         row.append(job.start_timestamp)
1354       elif fname == "end_ts":
1355         row.append(job.end_timestamp)
1356       elif fname == "summary":
1357         row.append([op.input.Summary() for op in job.ops])
1358       else:
1359         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1360     return row
1361
1362   @utils.LockedMethod
1363   @_RequireOpenQueue
1364   def QueryJobs(self, job_ids, fields):
1365     """Returns a list of jobs in queue.
1366
1367     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1368     processing for each job.
1369
1370     @type job_ids: list
1371     @param job_ids: sequence of job identifiers or None for all
1372     @type fields: list
1373     @param fields: names of fields to return
1374     @rtype: list
1375     @return: list one element per job, each element being list with
1376         the requested fields
1377
1378     """
1379     jobs = []
1380
1381     for job in self._GetJobsUnlocked(job_ids):
1382       if job is None:
1383         jobs.append(None)
1384       else:
1385         jobs.append(self._GetJobInfoUnlocked(job, fields))
1386
1387     return jobs
1388
1389   @utils.LockedMethod
1390   @_RequireOpenQueue
1391   def Shutdown(self):
1392     """Stops the job queue.
1393
1394     This shutdowns all the worker threads an closes the queue.
1395
1396     """
1397     self._wpool.TerminateWorkers()
1398
1399     self._queue_lock.Close()
1400     self._queue_lock = None