b3c7dbb316c8e5c7ee28ebf90f41b3ea884ccef3
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52
53
54 class CancelJob:
55   """Special exception to cancel a job.
56
57   """
58
59
60 def TimeStampNow():
61   """Returns the current timestamp.
62
63   @rtype: tuple
64   @return: the current time in the (seconds, microseconds) format
65
66   """
67   return utils.SplitTime(time.time())
68
69
70 class _QueuedOpCode(object):
71   """Encasulates an opcode object.
72
73   @ivar log: holds the execution log and consists of tuples
74   of the form C{(log_serial, timestamp, level, message)}
75   @ivar input: the OpCode we encapsulate
76   @ivar status: the current status
77   @ivar result: the result of the LU execution
78   @ivar start_timestamp: timestamp for the start of the execution
79   @ivar stop_timestamp: timestamp for the end of the execution
80
81   """
82   def __init__(self, op):
83     """Constructor for the _QuededOpCode.
84
85     @type op: L{opcodes.OpCode}
86     @param op: the opcode we encapsulate
87
88     """
89     self.input = op
90     self.status = constants.OP_STATUS_QUEUED
91     self.result = None
92     self.log = []
93     self.start_timestamp = None
94     self.end_timestamp = None
95
96   @classmethod
97   def Restore(cls, state):
98     """Restore the _QueuedOpCode from the serialized form.
99
100     @type state: dict
101     @param state: the serialized state
102     @rtype: _QueuedOpCode
103     @return: a new _QueuedOpCode instance
104
105     """
106     obj = _QueuedOpCode.__new__(cls)
107     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
108     obj.status = state["status"]
109     obj.result = state["result"]
110     obj.log = state["log"]
111     obj.start_timestamp = state.get("start_timestamp", None)
112     obj.end_timestamp = state.get("end_timestamp", None)
113     return obj
114
115   def Serialize(self):
116     """Serializes this _QueuedOpCode.
117
118     @rtype: dict
119     @return: the dictionary holding the serialized state
120
121     """
122     return {
123       "input": self.input.__getstate__(),
124       "status": self.status,
125       "result": self.result,
126       "log": self.log,
127       "start_timestamp": self.start_timestamp,
128       "end_timestamp": self.end_timestamp,
129       }
130
131
132 class _QueuedJob(object):
133   """In-memory job representation.
134
135   This is what we use to track the user-submitted jobs. Locking must
136   be taken care of by users of this class.
137
138   @type queue: L{JobQueue}
139   @ivar queue: the parent queue
140   @ivar id: the job ID
141   @type ops: list
142   @ivar ops: the list of _QueuedOpCode that constitute the job
143   @type run_op_index: int
144   @ivar run_op_index: the currently executing opcode, or -1 if
145       we didn't yet start executing
146   @type log_serial: int
147   @ivar log_serial: holds the index for the next log entry
148   @ivar received_timestamp: the timestamp for when the job was received
149   @ivar start_timestmap: the timestamp for start of execution
150   @ivar end_timestamp: the timestamp for end of execution
151   @ivar change: a Condition variable we use for waiting for job changes
152
153   """
154   def __init__(self, queue, job_id, ops):
155     """Constructor for the _QueuedJob.
156
157     @type queue: L{JobQueue}
158     @param queue: our parent queue
159     @type job_id: job_id
160     @param job_id: our job id
161     @type ops: list
162     @param ops: the list of opcodes we hold, which will be encapsulated
163         in _QueuedOpCodes
164
165     """
166     if not ops:
167       # TODO: use a better exception
168       raise Exception("No opcodes")
169
170     self.queue = queue
171     self.id = job_id
172     self.ops = [_QueuedOpCode(op) for op in ops]
173     self.run_op_index = -1
174     self.log_serial = 0
175     self.received_timestamp = TimeStampNow()
176     self.start_timestamp = None
177     self.end_timestamp = None
178
179     # Condition to wait for changes
180     self.change = threading.Condition(self.queue._lock)
181
182   @classmethod
183   def Restore(cls, queue, state):
184     """Restore a _QueuedJob from serialized state:
185
186     @type queue: L{JobQueue}
187     @param queue: to which queue the restored job belongs
188     @type state: dict
189     @param state: the serialized state
190     @rtype: _JobQueue
191     @return: the restored _JobQueue instance
192
193     """
194     obj = _QueuedJob.__new__(cls)
195     obj.queue = queue
196     obj.id = state["id"]
197     obj.run_op_index = state["run_op_index"]
198     obj.received_timestamp = state.get("received_timestamp", None)
199     obj.start_timestamp = state.get("start_timestamp", None)
200     obj.end_timestamp = state.get("end_timestamp", None)
201
202     obj.ops = []
203     obj.log_serial = 0
204     for op_state in state["ops"]:
205       op = _QueuedOpCode.Restore(op_state)
206       for log_entry in op.log:
207         obj.log_serial = max(obj.log_serial, log_entry[0])
208       obj.ops.append(op)
209
210     # Condition to wait for changes
211     obj.change = threading.Condition(obj.queue._lock)
212
213     return obj
214
215   def Serialize(self):
216     """Serialize the _JobQueue instance.
217
218     @rtype: dict
219     @return: the serialized state
220
221     """
222     return {
223       "id": self.id,
224       "ops": [op.Serialize() for op in self.ops],
225       "run_op_index": self.run_op_index,
226       "start_timestamp": self.start_timestamp,
227       "end_timestamp": self.end_timestamp,
228       "received_timestamp": self.received_timestamp,
229       }
230
231   def CalcStatus(self):
232     """Compute the status of this job.
233
234     This function iterates over all the _QueuedOpCodes in the job and
235     based on their status, computes the job status.
236
237     The algorithm is:
238       - if we find a cancelled, or finished with error, the job
239         status will be the same
240       - otherwise, the last opcode with the status one of:
241           - waitlock
242           - canceling
243           - running
244
245         will determine the job status
246
247       - otherwise, it means either all opcodes are queued, or success,
248         and the job status will be the same
249
250     @return: the job status
251
252     """
253     status = constants.JOB_STATUS_QUEUED
254
255     all_success = True
256     for op in self.ops:
257       if op.status == constants.OP_STATUS_SUCCESS:
258         continue
259
260       all_success = False
261
262       if op.status == constants.OP_STATUS_QUEUED:
263         pass
264       elif op.status == constants.OP_STATUS_WAITLOCK:
265         status = constants.JOB_STATUS_WAITLOCK
266       elif op.status == constants.OP_STATUS_RUNNING:
267         status = constants.JOB_STATUS_RUNNING
268       elif op.status == constants.OP_STATUS_CANCELING:
269         status = constants.JOB_STATUS_CANCELING
270         break
271       elif op.status == constants.OP_STATUS_ERROR:
272         status = constants.JOB_STATUS_ERROR
273         # The whole job fails if one opcode failed
274         break
275       elif op.status == constants.OP_STATUS_CANCELED:
276         status = constants.OP_STATUS_CANCELED
277         break
278
279     if all_success:
280       status = constants.JOB_STATUS_SUCCESS
281
282     return status
283
284   def GetLogEntries(self, newer_than):
285     """Selectively returns the log entries.
286
287     @type newer_than: None or int
288     @param newer_than: if this is None, return all log enties,
289         otherwise return only the log entries with serial higher
290         than this value
291     @rtype: list
292     @return: the list of the log entries selected
293
294     """
295     if newer_than is None:
296       serial = -1
297     else:
298       serial = newer_than
299
300     entries = []
301     for op in self.ops:
302       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
303
304     return entries
305
306
307 class _JobQueueWorker(workerpool.BaseWorker):
308   """The actual job workers.
309
310   """
311   def _NotifyStart(self):
312     """Mark the opcode as running, not lock-waiting.
313
314     This is called from the mcpu code as a notifier function, when the
315     LU is finally about to start the Exec() method. Of course, to have
316     end-user visible results, the opcode must be initially (before
317     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
318
319     """
320     assert self.queue, "Queue attribute is missing"
321     assert self.opcode, "Opcode attribute is missing"
322
323     self.queue.acquire()
324     try:
325       assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
326                                     constants.OP_STATUS_CANCELING)
327
328       # Cancel here if we were asked to
329       if self.opcode.status == constants.OP_STATUS_CANCELING:
330         raise CancelJob()
331
332       self.opcode.status = constants.OP_STATUS_RUNNING
333     finally:
334       self.queue.release()
335
336   def RunTask(self, job):
337     """Job executor.
338
339     This functions processes a job. It is closely tied to the _QueuedJob and
340     _QueuedOpCode classes.
341
342     @type job: L{_QueuedJob}
343     @param job: the job to be processed
344
345     """
346     logging.debug("Worker %s processing job %s",
347                   self.worker_id, job.id)
348     proc = mcpu.Processor(self.pool.queue.context)
349     self.queue = queue = job.queue
350     try:
351       try:
352         count = len(job.ops)
353         for idx, op in enumerate(job.ops):
354           try:
355             logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
356
357             queue.acquire()
358             try:
359               assert op.status == constants.OP_STATUS_QUEUED
360               job.run_op_index = idx
361               op.status = constants.OP_STATUS_WAITLOCK
362               op.result = None
363               op.start_timestamp = TimeStampNow()
364               if idx == 0: # first opcode
365                 job.start_timestamp = op.start_timestamp
366               queue.UpdateJobUnlocked(job)
367
368               input_opcode = op.input
369             finally:
370               queue.release()
371
372             def _Log(*args):
373               """Append a log entry.
374
375               """
376               assert len(args) < 3
377
378               if len(args) == 1:
379                 log_type = constants.ELOG_MESSAGE
380                 log_msg = args[0]
381               else:
382                 log_type, log_msg = args
383
384               # The time is split to make serialization easier and not lose
385               # precision.
386               timestamp = utils.SplitTime(time.time())
387
388               queue.acquire()
389               try:
390                 job.log_serial += 1
391                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
392
393                 job.change.notifyAll()
394               finally:
395                 queue.release()
396
397             # Make sure not to hold lock while _Log is called
398             self.opcode = op
399             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
400
401             queue.acquire()
402             try:
403               op.status = constants.OP_STATUS_SUCCESS
404               op.result = result
405               op.end_timestamp = TimeStampNow()
406               queue.UpdateJobUnlocked(job)
407             finally:
408               queue.release()
409
410             logging.debug("Op %s/%s: Successfully finished %s",
411                           idx + 1, count, op)
412           except CancelJob:
413             # Will be handled further up
414             raise
415           except Exception, err:
416             queue.acquire()
417             try:
418               try:
419                 op.status = constants.OP_STATUS_ERROR
420                 op.result = str(err)
421                 op.end_timestamp = TimeStampNow()
422                 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
423               finally:
424                 queue.UpdateJobUnlocked(job)
425             finally:
426               queue.release()
427             raise
428
429       except CancelJob:
430         queue.acquire()
431         try:
432           queue.CancelJobUnlocked(job)
433         finally:
434           queue.release()
435       except errors.GenericError, err:
436         logging.exception("Ganeti exception")
437       except:
438         logging.exception("Unhandled exception")
439     finally:
440       queue.acquire()
441       try:
442         try:
443           job.run_op_idx = -1
444           job.end_timestamp = TimeStampNow()
445           queue.UpdateJobUnlocked(job)
446         finally:
447           job_id = job.id
448           status = job.CalcStatus()
449       finally:
450         queue.release()
451       logging.debug("Worker %s finished job %s, status = %s",
452                     self.worker_id, job_id, status)
453
454
455 class _JobQueueWorkerPool(workerpool.WorkerPool):
456   """Simple class implementing a job-processing workerpool.
457
458   """
459   def __init__(self, queue):
460     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
461                                               _JobQueueWorker)
462     self.queue = queue
463
464
465 class JobQueue(object):
466   """Quue used to manaage the jobs.
467
468   @cvar _RE_JOB_FILE: regex matching the valid job file names
469
470   """
471   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
472
473   def _RequireOpenQueue(fn):
474     """Decorator for "public" functions.
475
476     This function should be used for all 'public' functions. That is,
477     functions usually called from other classes.
478
479     @warning: Use this decorator only after utils.LockedMethod!
480
481     Example::
482       @utils.LockedMethod
483       @_RequireOpenQueue
484       def Example(self):
485         pass
486
487     """
488     def wrapper(self, *args, **kwargs):
489       assert self._queue_lock is not None, "Queue should be open"
490       return fn(self, *args, **kwargs)
491     return wrapper
492
493   def __init__(self, context):
494     """Constructor for JobQueue.
495
496     The constructor will initialize the job queue object and then
497     start loading the current jobs from disk, either for starting them
498     (if they were queue) or for aborting them (if they were already
499     running).
500
501     @type context: GanetiContext
502     @param context: the context object for access to the configuration
503         data and other ganeti objects
504
505     """
506     self.context = context
507     self._memcache = weakref.WeakValueDictionary()
508     self._my_hostname = utils.HostInfo().name
509
510     # Locking
511     self._lock = threading.Lock()
512     self.acquire = self._lock.acquire
513     self.release = self._lock.release
514
515     # Initialize
516     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
517
518     # Read serial file
519     self._last_serial = jstore.ReadSerial()
520     assert self._last_serial is not None, ("Serial file was modified between"
521                                            " check in jstore and here")
522
523     # Get initial list of nodes
524     self._nodes = dict((n.name, n.primary_ip)
525                        for n in self.context.cfg.GetAllNodesInfo().values()
526                        if n.master_candidate)
527
528     # Remove master node
529     try:
530       del self._nodes[self._my_hostname]
531     except KeyError:
532       pass
533
534     # TODO: Check consistency across nodes
535
536     # Setup worker pool
537     self._wpool = _JobQueueWorkerPool(self)
538     try:
539       # We need to lock here because WorkerPool.AddTask() may start a job while
540       # we're still doing our work.
541       self.acquire()
542       try:
543         logging.info("Inspecting job queue")
544
545         all_job_ids = self._GetJobIDsUnlocked()
546         jobs_count = len(all_job_ids)
547         lastinfo = time.time()
548         for idx, job_id in enumerate(all_job_ids):
549           # Give an update every 1000 jobs or 10 seconds
550           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
551               idx == (jobs_count - 1)):
552             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
553                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
554             lastinfo = time.time()
555
556           job = self._LoadJobUnlocked(job_id)
557
558           # a failure in loading the job can cause 'None' to be returned
559           if job is None:
560             continue
561
562           status = job.CalcStatus()
563
564           if status in (constants.JOB_STATUS_QUEUED, ):
565             self._wpool.AddTask(job)
566
567           elif status in (constants.JOB_STATUS_RUNNING,
568                           constants.JOB_STATUS_WAITLOCK,
569                           constants.JOB_STATUS_CANCELING):
570             logging.warning("Unfinished job %s found: %s", job.id, job)
571             try:
572               for op in job.ops:
573                 op.status = constants.OP_STATUS_ERROR
574                 op.result = "Unclean master daemon shutdown"
575             finally:
576               self.UpdateJobUnlocked(job)
577
578         logging.info("Job queue inspection finished")
579       finally:
580         self.release()
581     except:
582       self._wpool.TerminateWorkers()
583       raise
584
585   @utils.LockedMethod
586   @_RequireOpenQueue
587   def AddNode(self, node):
588     """Register a new node with the queue.
589
590     @type node: L{objects.Node}
591     @param node: the node object to be added
592
593     """
594     node_name = node.name
595     assert node_name != self._my_hostname
596
597     # Clean queue directory on added node
598     rpc.RpcRunner.call_jobqueue_purge(node_name)
599
600     if not node.master_candidate:
601       # remove if existing, ignoring errors
602       self._nodes.pop(node_name, None)
603       # and skip the replication of the job ids
604       return
605
606     # Upload the whole queue excluding archived jobs
607     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
608
609     # Upload current serial file
610     files.append(constants.JOB_QUEUE_SERIAL_FILE)
611
612     for file_name in files:
613       # Read file content
614       fd = open(file_name, "r")
615       try:
616         content = fd.read()
617       finally:
618         fd.close()
619
620       result = rpc.RpcRunner.call_jobqueue_update([node_name],
621                                                   [node.primary_ip],
622                                                   file_name, content)
623       if not result[node_name]:
624         logging.error("Failed to upload %s to %s", file_name, node_name)
625
626     self._nodes[node_name] = node.primary_ip
627
628   @utils.LockedMethod
629   @_RequireOpenQueue
630   def RemoveNode(self, node_name):
631     """Callback called when removing nodes from the cluster.
632
633     @type node_name: str
634     @param node_name: the name of the node to remove
635
636     """
637     try:
638       # The queue is removed by the "leave node" RPC call.
639       del self._nodes[node_name]
640     except KeyError:
641       pass
642
643   def _CheckRpcResult(self, result, nodes, failmsg):
644     """Verifies the status of an RPC call.
645
646     Since we aim to keep consistency should this node (the current
647     master) fail, we will log errors if our rpc fail, and especially
648     log the case when more than half of the nodes failes.
649
650     @param result: the data as returned from the rpc call
651     @type nodes: list
652     @param nodes: the list of nodes we made the call to
653     @type failmsg: str
654     @param failmsg: the identifier to be used for logging
655
656     """
657     failed = []
658     success = []
659
660     for node in nodes:
661       if result[node]:
662         success.append(node)
663       else:
664         failed.append(node)
665
666     if failed:
667       logging.error("%s failed on %s", failmsg, ", ".join(failed))
668
669     # +1 for the master node
670     if (len(success) + 1) < len(failed):
671       # TODO: Handle failing nodes
672       logging.error("More than half of the nodes failed")
673
674   def _GetNodeIp(self):
675     """Helper for returning the node name/ip list.
676
677     @rtype: (list, list)
678     @return: a tuple of two lists, the first one with the node
679         names and the second one with the node addresses
680
681     """
682     name_list = self._nodes.keys()
683     addr_list = [self._nodes[name] for name in name_list]
684     return name_list, addr_list
685
686   def _WriteAndReplicateFileUnlocked(self, file_name, data):
687     """Writes a file locally and then replicates it to all nodes.
688
689     This function will replace the contents of a file on the local
690     node and then replicate it to all the other nodes we have.
691
692     @type file_name: str
693     @param file_name: the path of the file to be replicated
694     @type data: str
695     @param data: the new contents of the file
696
697     """
698     utils.WriteFile(file_name, data=data)
699
700     names, addrs = self._GetNodeIp()
701     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
702     self._CheckRpcResult(result, self._nodes,
703                          "Updating %s" % file_name)
704
705   def _RenameFileUnlocked(self, old, new):
706     """Renames a file locally and then replicate the change.
707
708     This function will rename a file in the local queue directory
709     and then replicate this rename to all the other nodes we have.
710
711     @type old: str
712     @param old: the current name of the file
713     @type new: str
714     @param new: the new name of the file
715
716     """
717     os.rename(old, new)
718
719     names, addrs = self._GetNodeIp()
720     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
721     self._CheckRpcResult(result, self._nodes,
722                          "Moving %s to %s" % (old, new))
723
724   def _FormatJobID(self, job_id):
725     """Convert a job ID to string format.
726
727     Currently this just does C{str(job_id)} after performing some
728     checks, but if we want to change the job id format this will
729     abstract this change.
730
731     @type job_id: int or long
732     @param job_id: the numeric job id
733     @rtype: str
734     @return: the formatted job id
735
736     """
737     if not isinstance(job_id, (int, long)):
738       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
739     if job_id < 0:
740       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
741
742     return str(job_id)
743
744   def _NewSerialUnlocked(self):
745     """Generates a new job identifier.
746
747     Job identifiers are unique during the lifetime of a cluster.
748
749     @rtype: str
750     @return: a string representing the job identifier.
751
752     """
753     # New number
754     serial = self._last_serial + 1
755
756     # Write to file
757     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
758                                         "%s\n" % serial)
759
760     # Keep it only if we were able to write the file
761     self._last_serial = serial
762
763     return self._FormatJobID(serial)
764
765   @staticmethod
766   def _GetJobPath(job_id):
767     """Returns the job file for a given job id.
768
769     @type job_id: str
770     @param job_id: the job identifier
771     @rtype: str
772     @return: the path to the job file
773
774     """
775     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
776
777   @staticmethod
778   def _GetArchivedJobPath(job_id):
779     """Returns the archived job file for a give job id.
780
781     @type job_id: str
782     @param job_id: the job identifier
783     @rtype: str
784     @return: the path to the archived job file
785
786     """
787     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
788
789   @classmethod
790   def _ExtractJobID(cls, name):
791     """Extract the job id from a filename.
792
793     @type name: str
794     @param name: the job filename
795     @rtype: job id or None
796     @return: the job id corresponding to the given filename,
797         or None if the filename does not represent a valid
798         job file
799
800     """
801     m = cls._RE_JOB_FILE.match(name)
802     if m:
803       return m.group(1)
804     else:
805       return None
806
807   def _GetJobIDsUnlocked(self, archived=False):
808     """Return all known job IDs.
809
810     If the parameter archived is True, archived jobs IDs will be
811     included. Currently this argument is unused.
812
813     The method only looks at disk because it's a requirement that all
814     jobs are present on disk (so in the _memcache we don't have any
815     extra IDs).
816
817     @rtype: list
818     @return: the list of job IDs
819
820     """
821     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
822     jlist = utils.NiceSort(jlist)
823     return jlist
824
825   def _ListJobFiles(self):
826     """Returns the list of current job files.
827
828     @rtype: list
829     @return: the list of job file names
830
831     """
832     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
833             if self._RE_JOB_FILE.match(name)]
834
835   def _LoadJobUnlocked(self, job_id):
836     """Loads a job from the disk or memory.
837
838     Given a job id, this will return the cached job object if
839     existing, or try to load the job from the disk. If loading from
840     disk, it will also add the job to the cache.
841
842     @param job_id: the job id
843     @rtype: L{_QueuedJob} or None
844     @return: either None or the job object
845
846     """
847     job = self._memcache.get(job_id, None)
848     if job:
849       logging.debug("Found job %s in memcache", job_id)
850       return job
851
852     filepath = self._GetJobPath(job_id)
853     logging.debug("Loading job from %s", filepath)
854     try:
855       fd = open(filepath, "r")
856     except IOError, err:
857       if err.errno in (errno.ENOENT, ):
858         return None
859       raise
860     try:
861       data = serializer.LoadJson(fd.read())
862     finally:
863       fd.close()
864
865     try:
866       job = _QueuedJob.Restore(self, data)
867     except Exception, err:
868       new_path = self._GetArchivedJobPath(job_id)
869       if filepath == new_path:
870         # job already archived (future case)
871         logging.exception("Can't parse job %s", job_id)
872       else:
873         # non-archived case
874         logging.exception("Can't parse job %s, will archive.", job_id)
875         self._RenameFileUnlocked(filepath, new_path)
876       return None
877
878     self._memcache[job_id] = job
879     logging.debug("Added job %s to the cache", job_id)
880     return job
881
882   def _GetJobsUnlocked(self, job_ids):
883     """Return a list of jobs based on their IDs.
884
885     @type job_ids: list
886     @param job_ids: either an empty list (meaning all jobs),
887         or a list of job IDs
888     @rtype: list
889     @return: the list of job objects
890
891     """
892     if not job_ids:
893       job_ids = self._GetJobIDsUnlocked()
894
895     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
896
897   @staticmethod
898   def _IsQueueMarkedDrain():
899     """Check if the queue is marked from drain.
900
901     This currently uses the queue drain file, which makes it a
902     per-node flag. In the future this can be moved to the config file.
903
904     @rtype: boolean
905     @return: True of the job queue is marked for draining
906
907     """
908     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
909
910   @staticmethod
911   def SetDrainFlag(drain_flag):
912     """Sets the drain flag for the queue.
913
914     This is similar to the function L{backend.JobQueueSetDrainFlag},
915     and in the future we might merge them.
916
917     @type drain_flag: boolean
918     @param drain_flag: wheter to set or unset the drain flag
919
920     """
921     if drain_flag:
922       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
923     else:
924       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
925     return True
926
927   @utils.LockedMethod
928   @_RequireOpenQueue
929   def SubmitJob(self, ops):
930     """Create and store a new job.
931
932     This enters the job into our job queue and also puts it on the new
933     queue, in order for it to be picked up by the queue processors.
934
935     @type ops: list
936     @param ops: The list of OpCodes that will become the new job.
937     @rtype: job ID
938     @return: the job ID of the newly created job
939     @raise errors.JobQueueDrainError: if the job is marked for draining
940
941     """
942     if self._IsQueueMarkedDrain():
943       raise errors.JobQueueDrainError()
944     # Get job identifier
945     job_id = self._NewSerialUnlocked()
946     job = _QueuedJob(self, job_id, ops)
947
948     # Write to disk
949     self.UpdateJobUnlocked(job)
950
951     logging.debug("Adding new job %s to the cache", job_id)
952     self._memcache[job_id] = job
953
954     # Add to worker pool
955     self._wpool.AddTask(job)
956
957     return job.id
958
959   @_RequireOpenQueue
960   def UpdateJobUnlocked(self, job):
961     """Update a job's on disk storage.
962
963     After a job has been modified, this function needs to be called in
964     order to write the changes to disk and replicate them to the other
965     nodes.
966
967     @type job: L{_QueuedJob}
968     @param job: the changed job
969
970     """
971     filename = self._GetJobPath(job.id)
972     data = serializer.DumpJson(job.Serialize(), indent=False)
973     logging.debug("Writing job %s to %s", job.id, filename)
974     self._WriteAndReplicateFileUnlocked(filename, data)
975
976     # Notify waiters about potential changes
977     job.change.notifyAll()
978
979   @utils.LockedMethod
980   @_RequireOpenQueue
981   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
982                         timeout):
983     """Waits for changes in a job.
984
985     @type job_id: string
986     @param job_id: Job identifier
987     @type fields: list of strings
988     @param fields: Which fields to check for changes
989     @type prev_job_info: list or None
990     @param prev_job_info: Last job information returned
991     @type prev_log_serial: int
992     @param prev_log_serial: Last job message serial number
993     @type timeout: float
994     @param timeout: maximum time to wait
995     @rtype: tuple (job info, log entries)
996     @return: a tuple of the job information as required via
997         the fields parameter, and the log entries as a list
998
999         if the job has not changed and the timeout has expired,
1000         we instead return a special value,
1001         L{constants.JOB_NOTCHANGED}, which should be interpreted
1002         as such by the clients
1003
1004     """
1005     logging.debug("Waiting for changes in job %s", job_id)
1006     end_time = time.time() + timeout
1007     while True:
1008       delta_time = end_time - time.time()
1009       if delta_time < 0:
1010         return constants.JOB_NOTCHANGED
1011
1012       job = self._LoadJobUnlocked(job_id)
1013       if not job:
1014         logging.debug("Job %s not found", job_id)
1015         break
1016
1017       status = job.CalcStatus()
1018       job_info = self._GetJobInfoUnlocked(job, fields)
1019       log_entries = job.GetLogEntries(prev_log_serial)
1020
1021       # Serializing and deserializing data can cause type changes (e.g. from
1022       # tuple to list) or precision loss. We're doing it here so that we get
1023       # the same modifications as the data received from the client. Without
1024       # this, the comparison afterwards might fail without the data being
1025       # significantly different.
1026       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1027       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1028
1029       if status not in (constants.JOB_STATUS_QUEUED,
1030                         constants.JOB_STATUS_RUNNING,
1031                         constants.JOB_STATUS_WAITLOCK):
1032         # Don't even try to wait if the job is no longer running, there will be
1033         # no changes.
1034         break
1035
1036       if (prev_job_info != job_info or
1037           (log_entries and prev_log_serial != log_entries[0][0])):
1038         break
1039
1040       logging.debug("Waiting again")
1041
1042       # Release the queue lock while waiting
1043       job.change.wait(delta_time)
1044
1045     logging.debug("Job %s changed", job_id)
1046
1047     return (job_info, log_entries)
1048
1049   @utils.LockedMethod
1050   @_RequireOpenQueue
1051   def CancelJob(self, job_id):
1052     """Cancels a job.
1053
1054     This will only succeed if the job has not started yet.
1055
1056     @type job_id: string
1057     @param job_id: job ID of job to be cancelled.
1058
1059     """
1060     logging.info("Cancelling job %s", job_id)
1061
1062     job = self._LoadJobUnlocked(job_id)
1063     if not job:
1064       logging.debug("Job %s not found", job_id)
1065       return (False, "Job %s not found" % job_id)
1066
1067     job_status = job.CalcStatus()
1068
1069     if job_status not in (constants.JOB_STATUS_QUEUED,
1070                           constants.JOB_STATUS_WAITLOCK):
1071       logging.debug("Job %s is no longer in the queue", job.id)
1072       return (False, "Job %s is no longer in the queue" % job.id)
1073
1074     if job_status == constants.JOB_STATUS_QUEUED:
1075       self.CancelJobUnlocked(job)
1076       return (True, "Job %s canceled" % job.id)
1077
1078     elif job_status == constants.JOB_STATUS_WAITLOCK:
1079       # The worker will notice the new status and cancel the job
1080       try:
1081         for op in job.ops:
1082           op.status = constants.OP_STATUS_CANCELING
1083       finally:
1084         self.UpdateJobUnlocked(job)
1085       return (True, "Job %s will be canceled" % job.id)
1086
1087   @_RequireOpenQueue
1088   def CancelJobUnlocked(self, job):
1089     """Marks a job as canceled.
1090
1091     """
1092     try:
1093       for op in job.ops:
1094         op.status = constants.OP_STATUS_ERROR
1095         op.result = "Job canceled by request"
1096     finally:
1097       self.UpdateJobUnlocked(job)
1098
1099   @_RequireOpenQueue
1100   def _ArchiveJobUnlocked(self, job_id):
1101     """Archives a job.
1102
1103     @type job_id: string
1104     @param job_id: Job ID of job to be archived.
1105
1106     """
1107     logging.info("Archiving job %s", job_id)
1108
1109     job = self._LoadJobUnlocked(job_id)
1110     if not job:
1111       logging.debug("Job %s not found", job_id)
1112       return
1113
1114     if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1115                                 constants.JOB_STATUS_SUCCESS,
1116                                 constants.JOB_STATUS_ERROR):
1117       logging.debug("Job %s is not yet done", job.id)
1118       return
1119
1120     old = self._GetJobPath(job.id)
1121     new = self._GetArchivedJobPath(job.id)
1122
1123     self._RenameFileUnlocked(old, new)
1124
1125     logging.debug("Successfully archived job %s", job.id)
1126
1127   @utils.LockedMethod
1128   @_RequireOpenQueue
1129   def ArchiveJob(self, job_id):
1130     """Archives a job.
1131
1132     This is just a wrapper over L{_ArchiveJobUnlocked}.
1133
1134     @type job_id: string
1135     @param job_id: Job ID of job to be archived.
1136
1137     """
1138     return self._ArchiveJobUnlocked(job_id)
1139
1140   @utils.LockedMethod
1141   @_RequireOpenQueue
1142   def AutoArchiveJobs(self, age):
1143     """Archives all jobs based on age.
1144
1145     The method will archive all jobs which are older than the age
1146     parameter. For jobs that don't have an end timestamp, the start
1147     timestamp will be considered. The special '-1' age will cause
1148     archival of all jobs (that are not running or queued).
1149
1150     @type age: int
1151     @param age: the minimum age in seconds
1152
1153     """
1154     logging.info("Archiving jobs with age more than %s seconds", age)
1155
1156     now = time.time()
1157     for jid in self._GetJobIDsUnlocked(archived=False):
1158       job = self._LoadJobUnlocked(jid)
1159       if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1160                                   constants.OP_STATUS_ERROR,
1161                                   constants.OP_STATUS_CANCELED):
1162         continue
1163       if job.end_timestamp is None:
1164         if job.start_timestamp is None:
1165           job_age = job.received_timestamp
1166         else:
1167           job_age = job.start_timestamp
1168       else:
1169         job_age = job.end_timestamp
1170
1171       if age == -1 or now - job_age[0] > age:
1172         self._ArchiveJobUnlocked(jid)
1173
1174   def _GetJobInfoUnlocked(self, job, fields):
1175     """Returns information about a job.
1176
1177     @type job: L{_QueuedJob}
1178     @param job: the job which we query
1179     @type fields: list
1180     @param fields: names of fields to return
1181     @rtype: list
1182     @return: list with one element for each field
1183     @raise errors.OpExecError: when an invalid field
1184         has been passed
1185
1186     """
1187     row = []
1188     for fname in fields:
1189       if fname == "id":
1190         row.append(job.id)
1191       elif fname == "status":
1192         row.append(job.CalcStatus())
1193       elif fname == "ops":
1194         row.append([op.input.__getstate__() for op in job.ops])
1195       elif fname == "opresult":
1196         row.append([op.result for op in job.ops])
1197       elif fname == "opstatus":
1198         row.append([op.status for op in job.ops])
1199       elif fname == "oplog":
1200         row.append([op.log for op in job.ops])
1201       elif fname == "opstart":
1202         row.append([op.start_timestamp for op in job.ops])
1203       elif fname == "opend":
1204         row.append([op.end_timestamp for op in job.ops])
1205       elif fname == "received_ts":
1206         row.append(job.received_timestamp)
1207       elif fname == "start_ts":
1208         row.append(job.start_timestamp)
1209       elif fname == "end_ts":
1210         row.append(job.end_timestamp)
1211       elif fname == "summary":
1212         row.append([op.input.Summary() for op in job.ops])
1213       else:
1214         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1215     return row
1216
1217   @utils.LockedMethod
1218   @_RequireOpenQueue
1219   def QueryJobs(self, job_ids, fields):
1220     """Returns a list of jobs in queue.
1221
1222     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1223     processing for each job.
1224
1225     @type job_ids: list
1226     @param job_ids: sequence of job identifiers or None for all
1227     @type fields: list
1228     @param fields: names of fields to return
1229     @rtype: list
1230     @return: list one element per job, each element being list with
1231         the requested fields
1232
1233     """
1234     jobs = []
1235
1236     for job in self._GetJobsUnlocked(job_ids):
1237       if job is None:
1238         jobs.append(None)
1239       else:
1240         jobs.append(self._GetJobInfoUnlocked(job, fields))
1241
1242     return jobs
1243
1244   @utils.LockedMethod
1245   @_RequireOpenQueue
1246   def Shutdown(self):
1247     """Stops the job queue.
1248
1249     This shutdowns all the worker threads an closes the queue.
1250
1251     """
1252     self._wpool.TerminateWorkers()
1253
1254     self._queue_lock.Close()
1255     self._queue_lock = None