KVM: fix RebootInstance
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encasulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   def __init__(self, op):
84     """Constructor for the _QuededOpCode.
85
86     @type op: L{opcodes.OpCode}
87     @param op: the opcode we encapsulate
88
89     """
90     self.input = op
91     self.status = constants.OP_STATUS_QUEUED
92     self.result = None
93     self.log = []
94     self.start_timestamp = None
95     self.end_timestamp = None
96
97   @classmethod
98   def Restore(cls, state):
99     """Restore the _QueuedOpCode from the serialized form.
100
101     @type state: dict
102     @param state: the serialized state
103     @rtype: _QueuedOpCode
104     @return: a new _QueuedOpCode instance
105
106     """
107     obj = _QueuedOpCode.__new__(cls)
108     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109     obj.status = state["status"]
110     obj.result = state["result"]
111     obj.log = state["log"]
112     obj.start_timestamp = state.get("start_timestamp", None)
113     obj.end_timestamp = state.get("end_timestamp", None)
114     return obj
115
116   def Serialize(self):
117     """Serializes this _QueuedOpCode.
118
119     @rtype: dict
120     @return: the dictionary holding the serialized state
121
122     """
123     return {
124       "input": self.input.__getstate__(),
125       "status": self.status,
126       "result": self.result,
127       "log": self.log,
128       "start_timestamp": self.start_timestamp,
129       "end_timestamp": self.end_timestamp,
130       }
131
132
133 class _QueuedJob(object):
134   """In-memory job representation.
135
136   This is what we use to track the user-submitted jobs. Locking must
137   be taken care of by users of this class.
138
139   @type queue: L{JobQueue}
140   @ivar queue: the parent queue
141   @ivar id: the job ID
142   @type ops: list
143   @ivar ops: the list of _QueuedOpCode that constitute the job
144   @type run_op_index: int
145   @ivar run_op_index: the currently executing opcode, or -1 if
146       we didn't yet start executing
147   @type log_serial: int
148   @ivar log_serial: holds the index for the next log entry
149   @ivar received_timestamp: the timestamp for when the job was received
150   @ivar start_timestmap: the timestamp for start of execution
151   @ivar end_timestamp: the timestamp for end of execution
152   @ivar change: a Condition variable we use for waiting for job changes
153
154   """
155   def __init__(self, queue, job_id, ops):
156     """Constructor for the _QueuedJob.
157
158     @type queue: L{JobQueue}
159     @param queue: our parent queue
160     @type job_id: job_id
161     @param job_id: our job id
162     @type ops: list
163     @param ops: the list of opcodes we hold, which will be encapsulated
164         in _QueuedOpCodes
165
166     """
167     if not ops:
168       # TODO: use a better exception
169       raise Exception("No opcodes")
170
171     self.queue = queue
172     self.id = job_id
173     self.ops = [_QueuedOpCode(op) for op in ops]
174     self.run_op_index = -1
175     self.log_serial = 0
176     self.received_timestamp = TimeStampNow()
177     self.start_timestamp = None
178     self.end_timestamp = None
179
180     # Condition to wait for changes
181     self.change = threading.Condition(self.queue._lock)
182
183   @classmethod
184   def Restore(cls, queue, state):
185     """Restore a _QueuedJob from serialized state:
186
187     @type queue: L{JobQueue}
188     @param queue: to which queue the restored job belongs
189     @type state: dict
190     @param state: the serialized state
191     @rtype: _JobQueue
192     @return: the restored _JobQueue instance
193
194     """
195     obj = _QueuedJob.__new__(cls)
196     obj.queue = queue
197     obj.id = state["id"]
198     obj.run_op_index = state["run_op_index"]
199     obj.received_timestamp = state.get("received_timestamp", None)
200     obj.start_timestamp = state.get("start_timestamp", None)
201     obj.end_timestamp = state.get("end_timestamp", None)
202
203     obj.ops = []
204     obj.log_serial = 0
205     for op_state in state["ops"]:
206       op = _QueuedOpCode.Restore(op_state)
207       for log_entry in op.log:
208         obj.log_serial = max(obj.log_serial, log_entry[0])
209       obj.ops.append(op)
210
211     # Condition to wait for changes
212     obj.change = threading.Condition(obj.queue._lock)
213
214     return obj
215
216   def Serialize(self):
217     """Serialize the _JobQueue instance.
218
219     @rtype: dict
220     @return: the serialized state
221
222     """
223     return {
224       "id": self.id,
225       "ops": [op.Serialize() for op in self.ops],
226       "run_op_index": self.run_op_index,
227       "start_timestamp": self.start_timestamp,
228       "end_timestamp": self.end_timestamp,
229       "received_timestamp": self.received_timestamp,
230       }
231
232   def CalcStatus(self):
233     """Compute the status of this job.
234
235     This function iterates over all the _QueuedOpCodes in the job and
236     based on their status, computes the job status.
237
238     The algorithm is:
239       - if we find a cancelled, or finished with error, the job
240         status will be the same
241       - otherwise, the last opcode with the status one of:
242           - waitlock
243           - canceling
244           - running
245
246         will determine the job status
247
248       - otherwise, it means either all opcodes are queued, or success,
249         and the job status will be the same
250
251     @return: the job status
252
253     """
254     status = constants.JOB_STATUS_QUEUED
255
256     all_success = True
257     for op in self.ops:
258       if op.status == constants.OP_STATUS_SUCCESS:
259         continue
260
261       all_success = False
262
263       if op.status == constants.OP_STATUS_QUEUED:
264         pass
265       elif op.status == constants.OP_STATUS_WAITLOCK:
266         status = constants.JOB_STATUS_WAITLOCK
267       elif op.status == constants.OP_STATUS_RUNNING:
268         status = constants.JOB_STATUS_RUNNING
269       elif op.status == constants.OP_STATUS_CANCELING:
270         status = constants.JOB_STATUS_CANCELING
271         break
272       elif op.status == constants.OP_STATUS_ERROR:
273         status = constants.JOB_STATUS_ERROR
274         # The whole job fails if one opcode failed
275         break
276       elif op.status == constants.OP_STATUS_CANCELED:
277         status = constants.OP_STATUS_CANCELED
278         break
279
280     if all_success:
281       status = constants.JOB_STATUS_SUCCESS
282
283     return status
284
285   def GetLogEntries(self, newer_than):
286     """Selectively returns the log entries.
287
288     @type newer_than: None or int
289     @param newer_than: if this is None, return all log enties,
290         otherwise return only the log entries with serial higher
291         than this value
292     @rtype: list
293     @return: the list of the log entries selected
294
295     """
296     if newer_than is None:
297       serial = -1
298     else:
299       serial = newer_than
300
301     entries = []
302     for op in self.ops:
303       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304
305     return entries
306
307
308 class _JobQueueWorker(workerpool.BaseWorker):
309   """The actual job workers.
310
311   """
312   def _NotifyStart(self):
313     """Mark the opcode as running, not lock-waiting.
314
315     This is called from the mcpu code as a notifier function, when the
316     LU is finally about to start the Exec() method. Of course, to have
317     end-user visible results, the opcode must be initially (before
318     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
319
320     """
321     assert self.queue, "Queue attribute is missing"
322     assert self.opcode, "Opcode attribute is missing"
323
324     self.queue.acquire()
325     try:
326       assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327                                     constants.OP_STATUS_CANCELING)
328
329       # Cancel here if we were asked to
330       if self.opcode.status == constants.OP_STATUS_CANCELING:
331         raise CancelJob()
332
333       self.opcode.status = constants.OP_STATUS_RUNNING
334     finally:
335       self.queue.release()
336
337   def RunTask(self, job):
338     """Job executor.
339
340     This functions processes a job. It is closely tied to the _QueuedJob and
341     _QueuedOpCode classes.
342
343     @type job: L{_QueuedJob}
344     @param job: the job to be processed
345
346     """
347     logging.info("Worker %s processing job %s",
348                   self.worker_id, job.id)
349     proc = mcpu.Processor(self.pool.queue.context)
350     self.queue = queue = job.queue
351     try:
352       try:
353         count = len(job.ops)
354         for idx, op in enumerate(job.ops):
355           op_summary = op.input.Summary()
356           try:
357             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
358                          op_summary)
359
360             queue.acquire()
361             try:
362               assert op.status == constants.OP_STATUS_QUEUED
363               job.run_op_index = idx
364               op.status = constants.OP_STATUS_WAITLOCK
365               op.result = None
366               op.start_timestamp = TimeStampNow()
367               if idx == 0: # first opcode
368                 job.start_timestamp = op.start_timestamp
369               queue.UpdateJobUnlocked(job)
370
371               input_opcode = op.input
372             finally:
373               queue.release()
374
375             def _Log(*args):
376               """Append a log entry.
377
378               """
379               assert len(args) < 3
380
381               if len(args) == 1:
382                 log_type = constants.ELOG_MESSAGE
383                 log_msg = args[0]
384               else:
385                 log_type, log_msg = args
386
387               # The time is split to make serialization easier and not lose
388               # precision.
389               timestamp = utils.SplitTime(time.time())
390
391               queue.acquire()
392               try:
393                 job.log_serial += 1
394                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
395
396                 job.change.notifyAll()
397               finally:
398                 queue.release()
399
400             # Make sure not to hold lock while _Log is called
401             self.opcode = op
402             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
403
404             queue.acquire()
405             try:
406               op.status = constants.OP_STATUS_SUCCESS
407               op.result = result
408               op.end_timestamp = TimeStampNow()
409               queue.UpdateJobUnlocked(job)
410             finally:
411               queue.release()
412
413             logging.info("Op %s/%s: Successfully finished opcode %s",
414                          idx + 1, count, op_summary)
415           except CancelJob:
416             # Will be handled further up
417             raise
418           except Exception, err:
419             queue.acquire()
420             try:
421               try:
422                 op.status = constants.OP_STATUS_ERROR
423                 op.result = str(err)
424                 op.end_timestamp = TimeStampNow()
425                 logging.info("Op %s/%s: Error in opcode %s", idx + 1, count,
426                              op_summary)
427               finally:
428                 queue.UpdateJobUnlocked(job)
429             finally:
430               queue.release()
431             raise
432
433       except CancelJob:
434         queue.acquire()
435         try:
436           queue.CancelJobUnlocked(job)
437         finally:
438           queue.release()
439       except errors.GenericError, err:
440         logging.exception("Ganeti exception")
441       except:
442         logging.exception("Unhandled exception")
443     finally:
444       queue.acquire()
445       try:
446         try:
447           job.run_op_idx = -1
448           job.end_timestamp = TimeStampNow()
449           queue.UpdateJobUnlocked(job)
450         finally:
451           job_id = job.id
452           status = job.CalcStatus()
453       finally:
454         queue.release()
455       logging.info("Worker %s finished job %s, status = %s",
456                    self.worker_id, job_id, status)
457
458
459 class _JobQueueWorkerPool(workerpool.WorkerPool):
460   """Simple class implementing a job-processing workerpool.
461
462   """
463   def __init__(self, queue):
464     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
465                                               _JobQueueWorker)
466     self.queue = queue
467
468
469 class JobQueue(object):
470   """Quue used to manaage the jobs.
471
472   @cvar _RE_JOB_FILE: regex matching the valid job file names
473
474   """
475   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
476
477   def _RequireOpenQueue(fn):
478     """Decorator for "public" functions.
479
480     This function should be used for all 'public' functions. That is,
481     functions usually called from other classes.
482
483     @warning: Use this decorator only after utils.LockedMethod!
484
485     Example::
486       @utils.LockedMethod
487       @_RequireOpenQueue
488       def Example(self):
489         pass
490
491     """
492     def wrapper(self, *args, **kwargs):
493       assert self._queue_lock is not None, "Queue should be open"
494       return fn(self, *args, **kwargs)
495     return wrapper
496
497   def __init__(self, context):
498     """Constructor for JobQueue.
499
500     The constructor will initialize the job queue object and then
501     start loading the current jobs from disk, either for starting them
502     (if they were queue) or for aborting them (if they were already
503     running).
504
505     @type context: GanetiContext
506     @param context: the context object for access to the configuration
507         data and other ganeti objects
508
509     """
510     self.context = context
511     self._memcache = weakref.WeakValueDictionary()
512     self._my_hostname = utils.HostInfo().name
513
514     # Locking
515     self._lock = threading.Lock()
516     self.acquire = self._lock.acquire
517     self.release = self._lock.release
518
519     # Initialize
520     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
521
522     # Read serial file
523     self._last_serial = jstore.ReadSerial()
524     assert self._last_serial is not None, ("Serial file was modified between"
525                                            " check in jstore and here")
526
527     # Get initial list of nodes
528     self._nodes = dict((n.name, n.primary_ip)
529                        for n in self.context.cfg.GetAllNodesInfo().values()
530                        if n.master_candidate)
531
532     # Remove master node
533     try:
534       del self._nodes[self._my_hostname]
535     except KeyError:
536       pass
537
538     # TODO: Check consistency across nodes
539
540     # Setup worker pool
541     self._wpool = _JobQueueWorkerPool(self)
542     try:
543       # We need to lock here because WorkerPool.AddTask() may start a job while
544       # we're still doing our work.
545       self.acquire()
546       try:
547         logging.info("Inspecting job queue")
548
549         all_job_ids = self._GetJobIDsUnlocked()
550         jobs_count = len(all_job_ids)
551         lastinfo = time.time()
552         for idx, job_id in enumerate(all_job_ids):
553           # Give an update every 1000 jobs or 10 seconds
554           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
555               idx == (jobs_count - 1)):
556             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
557                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
558             lastinfo = time.time()
559
560           job = self._LoadJobUnlocked(job_id)
561
562           # a failure in loading the job can cause 'None' to be returned
563           if job is None:
564             continue
565
566           status = job.CalcStatus()
567
568           if status in (constants.JOB_STATUS_QUEUED, ):
569             self._wpool.AddTask(job)
570
571           elif status in (constants.JOB_STATUS_RUNNING,
572                           constants.JOB_STATUS_WAITLOCK,
573                           constants.JOB_STATUS_CANCELING):
574             logging.warning("Unfinished job %s found: %s", job.id, job)
575             try:
576               for op in job.ops:
577                 op.status = constants.OP_STATUS_ERROR
578                 op.result = "Unclean master daemon shutdown"
579             finally:
580               self.UpdateJobUnlocked(job)
581
582         logging.info("Job queue inspection finished")
583       finally:
584         self.release()
585     except:
586       self._wpool.TerminateWorkers()
587       raise
588
589   @utils.LockedMethod
590   @_RequireOpenQueue
591   def AddNode(self, node):
592     """Register a new node with the queue.
593
594     @type node: L{objects.Node}
595     @param node: the node object to be added
596
597     """
598     node_name = node.name
599     assert node_name != self._my_hostname
600
601     # Clean queue directory on added node
602     rpc.RpcRunner.call_jobqueue_purge(node_name)
603
604     if not node.master_candidate:
605       # remove if existing, ignoring errors
606       self._nodes.pop(node_name, None)
607       # and skip the replication of the job ids
608       return
609
610     # Upload the whole queue excluding archived jobs
611     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
612
613     # Upload current serial file
614     files.append(constants.JOB_QUEUE_SERIAL_FILE)
615
616     for file_name in files:
617       # Read file content
618       fd = open(file_name, "r")
619       try:
620         content = fd.read()
621       finally:
622         fd.close()
623
624       result = rpc.RpcRunner.call_jobqueue_update([node_name],
625                                                   [node.primary_ip],
626                                                   file_name, content)
627       if not result[node_name]:
628         logging.error("Failed to upload %s to %s", file_name, node_name)
629
630     self._nodes[node_name] = node.primary_ip
631
632   @utils.LockedMethod
633   @_RequireOpenQueue
634   def RemoveNode(self, node_name):
635     """Callback called when removing nodes from the cluster.
636
637     @type node_name: str
638     @param node_name: the name of the node to remove
639
640     """
641     try:
642       # The queue is removed by the "leave node" RPC call.
643       del self._nodes[node_name]
644     except KeyError:
645       pass
646
647   def _CheckRpcResult(self, result, nodes, failmsg):
648     """Verifies the status of an RPC call.
649
650     Since we aim to keep consistency should this node (the current
651     master) fail, we will log errors if our rpc fail, and especially
652     log the case when more than half of the nodes failes.
653
654     @param result: the data as returned from the rpc call
655     @type nodes: list
656     @param nodes: the list of nodes we made the call to
657     @type failmsg: str
658     @param failmsg: the identifier to be used for logging
659
660     """
661     failed = []
662     success = []
663
664     for node in nodes:
665       if result[node]:
666         success.append(node)
667       else:
668         failed.append(node)
669
670     if failed:
671       logging.error("%s failed on %s", failmsg, ", ".join(failed))
672
673     # +1 for the master node
674     if (len(success) + 1) < len(failed):
675       # TODO: Handle failing nodes
676       logging.error("More than half of the nodes failed")
677
678   def _GetNodeIp(self):
679     """Helper for returning the node name/ip list.
680
681     @rtype: (list, list)
682     @return: a tuple of two lists, the first one with the node
683         names and the second one with the node addresses
684
685     """
686     name_list = self._nodes.keys()
687     addr_list = [self._nodes[name] for name in name_list]
688     return name_list, addr_list
689
690   def _WriteAndReplicateFileUnlocked(self, file_name, data):
691     """Writes a file locally and then replicates it to all nodes.
692
693     This function will replace the contents of a file on the local
694     node and then replicate it to all the other nodes we have.
695
696     @type file_name: str
697     @param file_name: the path of the file to be replicated
698     @type data: str
699     @param data: the new contents of the file
700
701     """
702     utils.WriteFile(file_name, data=data)
703
704     names, addrs = self._GetNodeIp()
705     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
706     self._CheckRpcResult(result, self._nodes,
707                          "Updating %s" % file_name)
708
709   def _RenameFilesUnlocked(self, rename):
710     """Renames a file locally and then replicate the change.
711
712     This function will rename a file in the local queue directory
713     and then replicate this rename to all the other nodes we have.
714
715     @type rename: list of (old, new)
716     @param rename: List containing tuples mapping old to new names
717
718     """
719     # Rename them locally
720     for old, new in rename:
721       utils.RenameFile(old, new, mkdir=True)
722
723     # ... and on all nodes
724     names, addrs = self._GetNodeIp()
725     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
726     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
727
728   def _FormatJobID(self, job_id):
729     """Convert a job ID to string format.
730
731     Currently this just does C{str(job_id)} after performing some
732     checks, but if we want to change the job id format this will
733     abstract this change.
734
735     @type job_id: int or long
736     @param job_id: the numeric job id
737     @rtype: str
738     @return: the formatted job id
739
740     """
741     if not isinstance(job_id, (int, long)):
742       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
743     if job_id < 0:
744       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
745
746     return str(job_id)
747
748   @classmethod
749   def _GetArchiveDirectory(cls, job_id):
750     """Returns the archive directory for a job.
751
752     @type job_id: str
753     @param job_id: Job identifier
754     @rtype: str
755     @return: Directory name
756
757     """
758     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
759
760   def _NewSerialUnlocked(self):
761     """Generates a new job identifier.
762
763     Job identifiers are unique during the lifetime of a cluster.
764
765     @rtype: str
766     @return: a string representing the job identifier.
767
768     """
769     # New number
770     serial = self._last_serial + 1
771
772     # Write to file
773     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
774                                         "%s\n" % serial)
775
776     # Keep it only if we were able to write the file
777     self._last_serial = serial
778
779     return self._FormatJobID(serial)
780
781   @staticmethod
782   def _GetJobPath(job_id):
783     """Returns the job file for a given job id.
784
785     @type job_id: str
786     @param job_id: the job identifier
787     @rtype: str
788     @return: the path to the job file
789
790     """
791     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
792
793   @classmethod
794   def _GetArchivedJobPath(cls, job_id):
795     """Returns the archived job file for a give job id.
796
797     @type job_id: str
798     @param job_id: the job identifier
799     @rtype: str
800     @return: the path to the archived job file
801
802     """
803     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
804     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
805
806   @classmethod
807   def _ExtractJobID(cls, name):
808     """Extract the job id from a filename.
809
810     @type name: str
811     @param name: the job filename
812     @rtype: job id or None
813     @return: the job id corresponding to the given filename,
814         or None if the filename does not represent a valid
815         job file
816
817     """
818     m = cls._RE_JOB_FILE.match(name)
819     if m:
820       return m.group(1)
821     else:
822       return None
823
824   def _GetJobIDsUnlocked(self, archived=False):
825     """Return all known job IDs.
826
827     If the parameter archived is True, archived jobs IDs will be
828     included. Currently this argument is unused.
829
830     The method only looks at disk because it's a requirement that all
831     jobs are present on disk (so in the _memcache we don't have any
832     extra IDs).
833
834     @rtype: list
835     @return: the list of job IDs
836
837     """
838     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
839     jlist = utils.NiceSort(jlist)
840     return jlist
841
842   def _ListJobFiles(self):
843     """Returns the list of current job files.
844
845     @rtype: list
846     @return: the list of job file names
847
848     """
849     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
850             if self._RE_JOB_FILE.match(name)]
851
852   def _LoadJobUnlocked(self, job_id):
853     """Loads a job from the disk or memory.
854
855     Given a job id, this will return the cached job object if
856     existing, or try to load the job from the disk. If loading from
857     disk, it will also add the job to the cache.
858
859     @param job_id: the job id
860     @rtype: L{_QueuedJob} or None
861     @return: either None or the job object
862
863     """
864     job = self._memcache.get(job_id, None)
865     if job:
866       logging.debug("Found job %s in memcache", job_id)
867       return job
868
869     filepath = self._GetJobPath(job_id)
870     logging.debug("Loading job from %s", filepath)
871     try:
872       fd = open(filepath, "r")
873     except IOError, err:
874       if err.errno in (errno.ENOENT, ):
875         return None
876       raise
877     try:
878       data = serializer.LoadJson(fd.read())
879     finally:
880       fd.close()
881
882     try:
883       job = _QueuedJob.Restore(self, data)
884     except Exception, err:
885       new_path = self._GetArchivedJobPath(job_id)
886       if filepath == new_path:
887         # job already archived (future case)
888         logging.exception("Can't parse job %s", job_id)
889       else:
890         # non-archived case
891         logging.exception("Can't parse job %s, will archive.", job_id)
892         self._RenameFilesUnlocked([(filepath, new_path)])
893       return None
894
895     self._memcache[job_id] = job
896     logging.debug("Added job %s to the cache", job_id)
897     return job
898
899   def _GetJobsUnlocked(self, job_ids):
900     """Return a list of jobs based on their IDs.
901
902     @type job_ids: list
903     @param job_ids: either an empty list (meaning all jobs),
904         or a list of job IDs
905     @rtype: list
906     @return: the list of job objects
907
908     """
909     if not job_ids:
910       job_ids = self._GetJobIDsUnlocked()
911
912     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
913
914   @staticmethod
915   def _IsQueueMarkedDrain():
916     """Check if the queue is marked from drain.
917
918     This currently uses the queue drain file, which makes it a
919     per-node flag. In the future this can be moved to the config file.
920
921     @rtype: boolean
922     @return: True of the job queue is marked for draining
923
924     """
925     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
926
927   @staticmethod
928   def SetDrainFlag(drain_flag):
929     """Sets the drain flag for the queue.
930
931     This is similar to the function L{backend.JobQueueSetDrainFlag},
932     and in the future we might merge them.
933
934     @type drain_flag: boolean
935     @param drain_flag: wheter to set or unset the drain flag
936
937     """
938     if drain_flag:
939       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
940     else:
941       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
942     return True
943
944   @utils.LockedMethod
945   @_RequireOpenQueue
946   def SubmitJob(self, ops):
947     """Create and store a new job.
948
949     This enters the job into our job queue and also puts it on the new
950     queue, in order for it to be picked up by the queue processors.
951
952     @type ops: list
953     @param ops: The list of OpCodes that will become the new job.
954     @rtype: job ID
955     @return: the job ID of the newly created job
956     @raise errors.JobQueueDrainError: if the job is marked for draining
957
958     """
959     if self._IsQueueMarkedDrain():
960       raise errors.JobQueueDrainError()
961
962     # Check job queue size
963     size = len(self._ListJobFiles())
964     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
965       # TODO: Autoarchive jobs. Make sure it's not done on every job
966       # submission, though.
967       #size = ...
968       pass
969
970     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
971       raise errors.JobQueueFull()
972
973     # Get job identifier
974     job_id = self._NewSerialUnlocked()
975     job = _QueuedJob(self, job_id, ops)
976
977     # Write to disk
978     self.UpdateJobUnlocked(job)
979
980     logging.debug("Adding new job %s to the cache", job_id)
981     self._memcache[job_id] = job
982
983     # Add to worker pool
984     self._wpool.AddTask(job)
985
986     return job.id
987
988   @_RequireOpenQueue
989   def UpdateJobUnlocked(self, job):
990     """Update a job's on disk storage.
991
992     After a job has been modified, this function needs to be called in
993     order to write the changes to disk and replicate them to the other
994     nodes.
995
996     @type job: L{_QueuedJob}
997     @param job: the changed job
998
999     """
1000     filename = self._GetJobPath(job.id)
1001     data = serializer.DumpJson(job.Serialize(), indent=False)
1002     logging.debug("Writing job %s to %s", job.id, filename)
1003     self._WriteAndReplicateFileUnlocked(filename, data)
1004
1005     # Notify waiters about potential changes
1006     job.change.notifyAll()
1007
1008   @utils.LockedMethod
1009   @_RequireOpenQueue
1010   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1011                         timeout):
1012     """Waits for changes in a job.
1013
1014     @type job_id: string
1015     @param job_id: Job identifier
1016     @type fields: list of strings
1017     @param fields: Which fields to check for changes
1018     @type prev_job_info: list or None
1019     @param prev_job_info: Last job information returned
1020     @type prev_log_serial: int
1021     @param prev_log_serial: Last job message serial number
1022     @type timeout: float
1023     @param timeout: maximum time to wait
1024     @rtype: tuple (job info, log entries)
1025     @return: a tuple of the job information as required via
1026         the fields parameter, and the log entries as a list
1027
1028         if the job has not changed and the timeout has expired,
1029         we instead return a special value,
1030         L{constants.JOB_NOTCHANGED}, which should be interpreted
1031         as such by the clients
1032
1033     """
1034     logging.debug("Waiting for changes in job %s", job_id)
1035     end_time = time.time() + timeout
1036     while True:
1037       delta_time = end_time - time.time()
1038       if delta_time < 0:
1039         return constants.JOB_NOTCHANGED
1040
1041       job = self._LoadJobUnlocked(job_id)
1042       if not job:
1043         logging.debug("Job %s not found", job_id)
1044         break
1045
1046       status = job.CalcStatus()
1047       job_info = self._GetJobInfoUnlocked(job, fields)
1048       log_entries = job.GetLogEntries(prev_log_serial)
1049
1050       # Serializing and deserializing data can cause type changes (e.g. from
1051       # tuple to list) or precision loss. We're doing it here so that we get
1052       # the same modifications as the data received from the client. Without
1053       # this, the comparison afterwards might fail without the data being
1054       # significantly different.
1055       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1056       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1057
1058       if status not in (constants.JOB_STATUS_QUEUED,
1059                         constants.JOB_STATUS_RUNNING,
1060                         constants.JOB_STATUS_WAITLOCK):
1061         # Don't even try to wait if the job is no longer running, there will be
1062         # no changes.
1063         break
1064
1065       if (prev_job_info != job_info or
1066           (log_entries and prev_log_serial != log_entries[0][0])):
1067         break
1068
1069       logging.debug("Waiting again")
1070
1071       # Release the queue lock while waiting
1072       job.change.wait(delta_time)
1073
1074     logging.debug("Job %s changed", job_id)
1075
1076     return (job_info, log_entries)
1077
1078   @utils.LockedMethod
1079   @_RequireOpenQueue
1080   def CancelJob(self, job_id):
1081     """Cancels a job.
1082
1083     This will only succeed if the job has not started yet.
1084
1085     @type job_id: string
1086     @param job_id: job ID of job to be cancelled.
1087
1088     """
1089     logging.info("Cancelling job %s", job_id)
1090
1091     job = self._LoadJobUnlocked(job_id)
1092     if not job:
1093       logging.debug("Job %s not found", job_id)
1094       return (False, "Job %s not found" % job_id)
1095
1096     job_status = job.CalcStatus()
1097
1098     if job_status not in (constants.JOB_STATUS_QUEUED,
1099                           constants.JOB_STATUS_WAITLOCK):
1100       logging.debug("Job %s is no longer in the queue", job.id)
1101       return (False, "Job %s is no longer in the queue" % job.id)
1102
1103     if job_status == constants.JOB_STATUS_QUEUED:
1104       self.CancelJobUnlocked(job)
1105       return (True, "Job %s canceled" % job.id)
1106
1107     elif job_status == constants.JOB_STATUS_WAITLOCK:
1108       # The worker will notice the new status and cancel the job
1109       try:
1110         for op in job.ops:
1111           op.status = constants.OP_STATUS_CANCELING
1112       finally:
1113         self.UpdateJobUnlocked(job)
1114       return (True, "Job %s will be canceled" % job.id)
1115
1116   @_RequireOpenQueue
1117   def CancelJobUnlocked(self, job):
1118     """Marks a job as canceled.
1119
1120     """
1121     try:
1122       for op in job.ops:
1123         op.status = constants.OP_STATUS_ERROR
1124         op.result = "Job canceled by request"
1125     finally:
1126       self.UpdateJobUnlocked(job)
1127
1128   @_RequireOpenQueue
1129   def _ArchiveJobsUnlocked(self, jobs):
1130     """Archives jobs.
1131
1132     @type jobs: list of L{_QueuedJob}
1133     @param jobs: Job objects
1134     @rtype: int
1135     @return: Number of archived jobs
1136
1137     """
1138     archive_jobs = []
1139     rename_files = []
1140     for job in jobs:
1141       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1142                                   constants.JOB_STATUS_SUCCESS,
1143                                   constants.JOB_STATUS_ERROR):
1144         logging.debug("Job %s is not yet done", job.id)
1145         continue
1146
1147       archive_jobs.append(job)
1148
1149       old = self._GetJobPath(job.id)
1150       new = self._GetArchivedJobPath(job.id)
1151       rename_files.append((old, new))
1152
1153     # TODO: What if 1..n files fail to rename?
1154     self._RenameFilesUnlocked(rename_files)
1155
1156     logging.debug("Successfully archived job(s) %s",
1157                   ", ".join(job.id for job in archive_jobs))
1158
1159     return len(archive_jobs)
1160
1161   @utils.LockedMethod
1162   @_RequireOpenQueue
1163   def ArchiveJob(self, job_id):
1164     """Archives a job.
1165
1166     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1167
1168     @type job_id: string
1169     @param job_id: Job ID of job to be archived.
1170     @rtype: bool
1171     @return: Whether job was archived
1172
1173     """
1174     logging.info("Archiving job %s", job_id)
1175
1176     job = self._LoadJobUnlocked(job_id)
1177     if not job:
1178       logging.debug("Job %s not found", job_id)
1179       return False
1180
1181     return self._ArchiveJobUnlocked([job]) == 1
1182
1183   @utils.LockedMethod
1184   @_RequireOpenQueue
1185   def AutoArchiveJobs(self, age, timeout):
1186     """Archives all jobs based on age.
1187
1188     The method will archive all jobs which are older than the age
1189     parameter. For jobs that don't have an end timestamp, the start
1190     timestamp will be considered. The special '-1' age will cause
1191     archival of all jobs (that are not running or queued).
1192
1193     @type age: int
1194     @param age: the minimum age in seconds
1195
1196     """
1197     logging.info("Archiving jobs with age more than %s seconds", age)
1198
1199     now = time.time()
1200     end_time = now + timeout
1201     archived_count = 0
1202     last_touched = 0
1203
1204     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1205     pending = []
1206     for idx, job_id in enumerate(all_job_ids):
1207       last_touched = idx
1208
1209       # Not optimal because jobs could be pending
1210       # TODO: Measure average duration for job archival and take number of
1211       # pending jobs into account.
1212       if time.time() > end_time:
1213         break
1214
1215       # Returns None if the job failed to load
1216       job = self._LoadJobUnlocked(job_id)
1217       if job:
1218         if job.end_timestamp is None:
1219           if job.start_timestamp is None:
1220             job_age = job.received_timestamp
1221           else:
1222             job_age = job.start_timestamp
1223         else:
1224           job_age = job.end_timestamp
1225
1226         if age == -1 or now - job_age[0] > age:
1227           pending.append(job)
1228
1229           # Archive 10 jobs at a time
1230           if len(pending) >= 10:
1231             archived_count += self._ArchiveJobsUnlocked(pending)
1232             pending = []
1233
1234     if pending:
1235       archived_count += self._ArchiveJobsUnlocked(pending)
1236
1237     return (archived_count, len(all_job_ids) - last_touched - 1)
1238
1239   def _GetJobInfoUnlocked(self, job, fields):
1240     """Returns information about a job.
1241
1242     @type job: L{_QueuedJob}
1243     @param job: the job which we query
1244     @type fields: list
1245     @param fields: names of fields to return
1246     @rtype: list
1247     @return: list with one element for each field
1248     @raise errors.OpExecError: when an invalid field
1249         has been passed
1250
1251     """
1252     row = []
1253     for fname in fields:
1254       if fname == "id":
1255         row.append(job.id)
1256       elif fname == "status":
1257         row.append(job.CalcStatus())
1258       elif fname == "ops":
1259         row.append([op.input.__getstate__() for op in job.ops])
1260       elif fname == "opresult":
1261         row.append([op.result for op in job.ops])
1262       elif fname == "opstatus":
1263         row.append([op.status for op in job.ops])
1264       elif fname == "oplog":
1265         row.append([op.log for op in job.ops])
1266       elif fname == "opstart":
1267         row.append([op.start_timestamp for op in job.ops])
1268       elif fname == "opend":
1269         row.append([op.end_timestamp for op in job.ops])
1270       elif fname == "received_ts":
1271         row.append(job.received_timestamp)
1272       elif fname == "start_ts":
1273         row.append(job.start_timestamp)
1274       elif fname == "end_ts":
1275         row.append(job.end_timestamp)
1276       elif fname == "summary":
1277         row.append([op.input.Summary() for op in job.ops])
1278       else:
1279         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1280     return row
1281
1282   @utils.LockedMethod
1283   @_RequireOpenQueue
1284   def QueryJobs(self, job_ids, fields):
1285     """Returns a list of jobs in queue.
1286
1287     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1288     processing for each job.
1289
1290     @type job_ids: list
1291     @param job_ids: sequence of job identifiers or None for all
1292     @type fields: list
1293     @param fields: names of fields to return
1294     @rtype: list
1295     @return: list one element per job, each element being list with
1296         the requested fields
1297
1298     """
1299     jobs = []
1300
1301     for job in self._GetJobsUnlocked(job_ids):
1302       if job is None:
1303         jobs.append(None)
1304       else:
1305         jobs.append(self._GetJobInfoUnlocked(job, fields))
1306
1307     return jobs
1308
1309   @utils.LockedMethod
1310   @_RequireOpenQueue
1311   def Shutdown(self):
1312     """Stops the job queue.
1313
1314     This shutdowns all the worker threads an closes the queue.
1315
1316     """
1317     self._wpool.TerminateWorkers()
1318
1319     self._queue_lock.Close()
1320     self._queue_lock = None