e06c5f866550660baa71c96d6912234a57c1950c
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   __slots__ = ["input", "status", "result", "log",
84                "start_timestamp", "end_timestamp",
85                "__weakref__"]
86
87   def __init__(self, op):
88     """Constructor for the _QuededOpCode.
89
90     @type op: L{opcodes.OpCode}
91     @param op: the opcode we encapsulate
92
93     """
94     self.input = op
95     self.status = constants.OP_STATUS_QUEUED
96     self.result = None
97     self.log = []
98     self.start_timestamp = None
99     self.end_timestamp = None
100
101   @classmethod
102   def Restore(cls, state):
103     """Restore the _QueuedOpCode from the serialized form.
104
105     @type state: dict
106     @param state: the serialized state
107     @rtype: _QueuedOpCode
108     @return: a new _QueuedOpCode instance
109
110     """
111     obj = _QueuedOpCode.__new__(cls)
112     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113     obj.status = state["status"]
114     obj.result = state["result"]
115     obj.log = state["log"]
116     obj.start_timestamp = state.get("start_timestamp", None)
117     obj.end_timestamp = state.get("end_timestamp", None)
118     return obj
119
120   def Serialize(self):
121     """Serializes this _QueuedOpCode.
122
123     @rtype: dict
124     @return: the dictionary holding the serialized state
125
126     """
127     return {
128       "input": self.input.__getstate__(),
129       "status": self.status,
130       "result": self.result,
131       "log": self.log,
132       "start_timestamp": self.start_timestamp,
133       "end_timestamp": self.end_timestamp,
134       }
135
136
137 class _QueuedJob(object):
138   """In-memory job representation.
139
140   This is what we use to track the user-submitted jobs. Locking must
141   be taken care of by users of this class.
142
143   @type queue: L{JobQueue}
144   @ivar queue: the parent queue
145   @ivar id: the job ID
146   @type ops: list
147   @ivar ops: the list of _QueuedOpCode that constitute the job
148   @type run_op_index: int
149   @ivar run_op_index: the currently executing opcode, or -1 if
150       we didn't yet start executing
151   @type log_serial: int
152   @ivar log_serial: holds the index for the next log entry
153   @ivar received_timestamp: the timestamp for when the job was received
154   @ivar start_timestmap: the timestamp for start of execution
155   @ivar end_timestamp: the timestamp for end of execution
156   @ivar change: a Condition variable we use for waiting for job changes
157
158   """
159   __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160                "received_timestamp", "start_timestamp", "end_timestamp",
161                "change",
162                "__weakref__"]
163
164   def __init__(self, queue, job_id, ops):
165     """Constructor for the _QueuedJob.
166
167     @type queue: L{JobQueue}
168     @param queue: our parent queue
169     @type job_id: job_id
170     @param job_id: our job id
171     @type ops: list
172     @param ops: the list of opcodes we hold, which will be encapsulated
173         in _QueuedOpCodes
174
175     """
176     if not ops:
177       # TODO: use a better exception
178       raise Exception("No opcodes")
179
180     self.queue = queue
181     self.id = job_id
182     self.ops = [_QueuedOpCode(op) for op in ops]
183     self.run_op_index = -1
184     self.log_serial = 0
185     self.received_timestamp = TimeStampNow()
186     self.start_timestamp = None
187     self.end_timestamp = None
188
189     # Condition to wait for changes
190     self.change = threading.Condition(self.queue._lock)
191
192   @classmethod
193   def Restore(cls, queue, state):
194     """Restore a _QueuedJob from serialized state:
195
196     @type queue: L{JobQueue}
197     @param queue: to which queue the restored job belongs
198     @type state: dict
199     @param state: the serialized state
200     @rtype: _JobQueue
201     @return: the restored _JobQueue instance
202
203     """
204     obj = _QueuedJob.__new__(cls)
205     obj.queue = queue
206     obj.id = state["id"]
207     obj.run_op_index = state["run_op_index"]
208     obj.received_timestamp = state.get("received_timestamp", None)
209     obj.start_timestamp = state.get("start_timestamp", None)
210     obj.end_timestamp = state.get("end_timestamp", None)
211
212     obj.ops = []
213     obj.log_serial = 0
214     for op_state in state["ops"]:
215       op = _QueuedOpCode.Restore(op_state)
216       for log_entry in op.log:
217         obj.log_serial = max(obj.log_serial, log_entry[0])
218       obj.ops.append(op)
219
220     # Condition to wait for changes
221     obj.change = threading.Condition(obj.queue._lock)
222
223     return obj
224
225   def Serialize(self):
226     """Serialize the _JobQueue instance.
227
228     @rtype: dict
229     @return: the serialized state
230
231     """
232     return {
233       "id": self.id,
234       "ops": [op.Serialize() for op in self.ops],
235       "run_op_index": self.run_op_index,
236       "start_timestamp": self.start_timestamp,
237       "end_timestamp": self.end_timestamp,
238       "received_timestamp": self.received_timestamp,
239       }
240
241   def CalcStatus(self):
242     """Compute the status of this job.
243
244     This function iterates over all the _QueuedOpCodes in the job and
245     based on their status, computes the job status.
246
247     The algorithm is:
248       - if we find a cancelled, or finished with error, the job
249         status will be the same
250       - otherwise, the last opcode with the status one of:
251           - waitlock
252           - canceling
253           - running
254
255         will determine the job status
256
257       - otherwise, it means either all opcodes are queued, or success,
258         and the job status will be the same
259
260     @return: the job status
261
262     """
263     status = constants.JOB_STATUS_QUEUED
264
265     all_success = True
266     for op in self.ops:
267       if op.status == constants.OP_STATUS_SUCCESS:
268         continue
269
270       all_success = False
271
272       if op.status == constants.OP_STATUS_QUEUED:
273         pass
274       elif op.status == constants.OP_STATUS_WAITLOCK:
275         status = constants.JOB_STATUS_WAITLOCK
276       elif op.status == constants.OP_STATUS_RUNNING:
277         status = constants.JOB_STATUS_RUNNING
278       elif op.status == constants.OP_STATUS_CANCELING:
279         status = constants.JOB_STATUS_CANCELING
280         break
281       elif op.status == constants.OP_STATUS_ERROR:
282         status = constants.JOB_STATUS_ERROR
283         # The whole job fails if one opcode failed
284         break
285       elif op.status == constants.OP_STATUS_CANCELED:
286         status = constants.OP_STATUS_CANCELED
287         break
288
289     if all_success:
290       status = constants.JOB_STATUS_SUCCESS
291
292     return status
293
294   def GetLogEntries(self, newer_than):
295     """Selectively returns the log entries.
296
297     @type newer_than: None or int
298     @param newer_than: if this is None, return all log entries,
299         otherwise return only the log entries with serial higher
300         than this value
301     @rtype: list
302     @return: the list of the log entries selected
303
304     """
305     if newer_than is None:
306       serial = -1
307     else:
308       serial = newer_than
309
310     entries = []
311     for op in self.ops:
312       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313
314     return entries
315
316   def MarkUnfinishedOps(self, status, result):
317     """Mark unfinished opcodes with a given status and result.
318
319     This is an utility function for marking all running or waiting to
320     be run opcodes with a given status. Opcodes which are already
321     finalised are not changed.
322
323     @param status: a given opcode status
324     @param result: the opcode result
325
326     """
327     not_marked = True
328     for op in self.ops:
329       if op.status in constants.OPS_FINALIZED:
330         assert not_marked, "Finalized opcodes found after non-finalized ones"
331         continue
332       op.status = status
333       op.result = result
334       not_marked = False
335
336
337 class _JobQueueWorker(workerpool.BaseWorker):
338   """The actual job workers.
339
340   """
341   def _NotifyStart(self):
342     """Mark the opcode as running, not lock-waiting.
343
344     This is called from the mcpu code as a notifier function, when the
345     LU is finally about to start the Exec() method. Of course, to have
346     end-user visible results, the opcode must be initially (before
347     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
348
349     """
350     assert self.queue, "Queue attribute is missing"
351     assert self.opcode, "Opcode attribute is missing"
352
353     self.queue.acquire()
354     try:
355       assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
356                                     constants.OP_STATUS_CANCELING)
357
358       # Cancel here if we were asked to
359       if self.opcode.status == constants.OP_STATUS_CANCELING:
360         raise CancelJob()
361
362       self.opcode.status = constants.OP_STATUS_RUNNING
363     finally:
364       self.queue.release()
365
366   def RunTask(self, job):
367     """Job executor.
368
369     This functions processes a job. It is closely tied to the _QueuedJob and
370     _QueuedOpCode classes.
371
372     @type job: L{_QueuedJob}
373     @param job: the job to be processed
374
375     """
376     logging.info("Worker %s processing job %s",
377                   self.worker_id, job.id)
378     proc = mcpu.Processor(self.pool.queue.context)
379     self.queue = queue = job.queue
380     try:
381       try:
382         count = len(job.ops)
383         for idx, op in enumerate(job.ops):
384           op_summary = op.input.Summary()
385           if op.status == constants.OP_STATUS_SUCCESS:
386             # this is a job that was partially completed before master
387             # daemon shutdown, so it can be expected that some opcodes
388             # are already completed successfully (if any did error
389             # out, then the whole job should have been aborted and not
390             # resubmitted for processing)
391             logging.info("Op %s/%s: opcode %s already processed, skipping",
392                          idx + 1, count, op_summary)
393             continue
394           try:
395             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
396                          op_summary)
397
398             queue.acquire()
399             try:
400               if op.status == constants.OP_STATUS_CANCELED:
401                 raise CancelJob()
402               assert op.status == constants.OP_STATUS_QUEUED
403               job.run_op_index = idx
404               op.status = constants.OP_STATUS_WAITLOCK
405               op.result = None
406               op.start_timestamp = TimeStampNow()
407               if idx == 0: # first opcode
408                 job.start_timestamp = op.start_timestamp
409               queue.UpdateJobUnlocked(job)
410
411               input_opcode = op.input
412             finally:
413               queue.release()
414
415             def _Log(*args):
416               """Append a log entry.
417
418               """
419               assert len(args) < 3
420
421               if len(args) == 1:
422                 log_type = constants.ELOG_MESSAGE
423                 log_msg = args[0]
424               else:
425                 log_type, log_msg = args
426
427               # The time is split to make serialization easier and not lose
428               # precision.
429               timestamp = utils.SplitTime(time.time())
430
431               queue.acquire()
432               try:
433                 job.log_serial += 1
434                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
435
436                 job.change.notifyAll()
437               finally:
438                 queue.release()
439
440             # Make sure not to hold lock while _Log is called
441             self.opcode = op
442             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
443
444             queue.acquire()
445             try:
446               op.status = constants.OP_STATUS_SUCCESS
447               op.result = result
448               op.end_timestamp = TimeStampNow()
449               queue.UpdateJobUnlocked(job)
450             finally:
451               queue.release()
452
453             logging.info("Op %s/%s: Successfully finished opcode %s",
454                          idx + 1, count, op_summary)
455           except CancelJob:
456             # Will be handled further up
457             raise
458           except Exception, err:
459             queue.acquire()
460             try:
461               try:
462                 op.status = constants.OP_STATUS_ERROR
463                 op.result = str(err)
464                 op.end_timestamp = TimeStampNow()
465                 logging.info("Op %s/%s: Error in opcode %s: %s",
466                              idx + 1, count, op_summary, err)
467               finally:
468                 queue.UpdateJobUnlocked(job)
469             finally:
470               queue.release()
471             raise
472
473       except CancelJob:
474         queue.acquire()
475         try:
476           queue.CancelJobUnlocked(job)
477         finally:
478           queue.release()
479       except errors.GenericError, err:
480         logging.exception("Ganeti exception")
481       except:
482         logging.exception("Unhandled exception")
483     finally:
484       queue.acquire()
485       try:
486         try:
487           job.run_op_index = -1
488           job.end_timestamp = TimeStampNow()
489           queue.UpdateJobUnlocked(job)
490         finally:
491           job_id = job.id
492           status = job.CalcStatus()
493       finally:
494         queue.release()
495       logging.info("Worker %s finished job %s, status = %s",
496                    self.worker_id, job_id, status)
497
498
499 class _JobQueueWorkerPool(workerpool.WorkerPool):
500   """Simple class implementing a job-processing workerpool.
501
502   """
503   def __init__(self, queue):
504     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
505                                               _JobQueueWorker)
506     self.queue = queue
507
508
509 class JobQueue(object):
510   """Queue used to manage the jobs.
511
512   @cvar _RE_JOB_FILE: regex matching the valid job file names
513
514   """
515   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
516
517   def _RequireOpenQueue(fn):
518     """Decorator for "public" functions.
519
520     This function should be used for all 'public' functions. That is,
521     functions usually called from other classes.
522
523     @warning: Use this decorator only after utils.LockedMethod!
524
525     Example::
526       @utils.LockedMethod
527       @_RequireOpenQueue
528       def Example(self):
529         pass
530
531     """
532     def wrapper(self, *args, **kwargs):
533       assert self._queue_lock is not None, "Queue should be open"
534       return fn(self, *args, **kwargs)
535     return wrapper
536
537   def __init__(self, context):
538     """Constructor for JobQueue.
539
540     The constructor will initialize the job queue object and then
541     start loading the current jobs from disk, either for starting them
542     (if they were queue) or for aborting them (if they were already
543     running).
544
545     @type context: GanetiContext
546     @param context: the context object for access to the configuration
547         data and other ganeti objects
548
549     """
550     self.context = context
551     self._memcache = weakref.WeakValueDictionary()
552     self._my_hostname = utils.HostInfo().name
553
554     # Locking
555     self._lock = threading.Lock()
556     self.acquire = self._lock.acquire
557     self.release = self._lock.release
558
559     # Initialize
560     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
561
562     # Read serial file
563     self._last_serial = jstore.ReadSerial()
564     assert self._last_serial is not None, ("Serial file was modified between"
565                                            " check in jstore and here")
566
567     # Get initial list of nodes
568     self._nodes = dict((n.name, n.primary_ip)
569                        for n in self.context.cfg.GetAllNodesInfo().values()
570                        if n.master_candidate)
571
572     # Remove master node
573     try:
574       del self._nodes[self._my_hostname]
575     except KeyError:
576       pass
577
578     # TODO: Check consistency across nodes
579
580     # Setup worker pool
581     self._wpool = _JobQueueWorkerPool(self)
582     try:
583       # We need to lock here because WorkerPool.AddTask() may start a job while
584       # we're still doing our work.
585       self.acquire()
586       try:
587         logging.info("Inspecting job queue")
588
589         all_job_ids = self._GetJobIDsUnlocked()
590         jobs_count = len(all_job_ids)
591         lastinfo = time.time()
592         for idx, job_id in enumerate(all_job_ids):
593           # Give an update every 1000 jobs or 10 seconds
594           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
595               idx == (jobs_count - 1)):
596             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
597                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
598             lastinfo = time.time()
599
600           job = self._LoadJobUnlocked(job_id)
601
602           # a failure in loading the job can cause 'None' to be returned
603           if job is None:
604             continue
605
606           status = job.CalcStatus()
607
608           if status in (constants.JOB_STATUS_QUEUED, ):
609             self._wpool.AddTask(job)
610
611           elif status in (constants.JOB_STATUS_RUNNING,
612                           constants.JOB_STATUS_WAITLOCK,
613                           constants.JOB_STATUS_CANCELING):
614             logging.warning("Unfinished job %s found: %s", job.id, job)
615             try:
616               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
617                                     "Unclean master daemon shutdown")
618             finally:
619               self.UpdateJobUnlocked(job)
620
621         logging.info("Job queue inspection finished")
622       finally:
623         self.release()
624     except:
625       self._wpool.TerminateWorkers()
626       raise
627
628   @utils.LockedMethod
629   @_RequireOpenQueue
630   def AddNode(self, node):
631     """Register a new node with the queue.
632
633     @type node: L{objects.Node}
634     @param node: the node object to be added
635
636     """
637     node_name = node.name
638     assert node_name != self._my_hostname
639
640     # Clean queue directory on added node
641     rpc.RpcRunner.call_jobqueue_purge(node_name)
642
643     if not node.master_candidate:
644       # remove if existing, ignoring errors
645       self._nodes.pop(node_name, None)
646       # and skip the replication of the job ids
647       return
648
649     # Upload the whole queue excluding archived jobs
650     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
651
652     # Upload current serial file
653     files.append(constants.JOB_QUEUE_SERIAL_FILE)
654
655     for file_name in files:
656       # Read file content
657       fd = open(file_name, "r")
658       try:
659         content = fd.read()
660       finally:
661         fd.close()
662
663       result = rpc.RpcRunner.call_jobqueue_update([node_name],
664                                                   [node.primary_ip],
665                                                   file_name, content)
666       if not result[node_name]:
667         logging.error("Failed to upload %s to %s", file_name, node_name)
668
669     self._nodes[node_name] = node.primary_ip
670
671   @utils.LockedMethod
672   @_RequireOpenQueue
673   def RemoveNode(self, node_name):
674     """Callback called when removing nodes from the cluster.
675
676     @type node_name: str
677     @param node_name: the name of the node to remove
678
679     """
680     try:
681       # The queue is removed by the "leave node" RPC call.
682       del self._nodes[node_name]
683     except KeyError:
684       pass
685
686   def _CheckRpcResult(self, result, nodes, failmsg):
687     """Verifies the status of an RPC call.
688
689     Since we aim to keep consistency should this node (the current
690     master) fail, we will log errors if our rpc fail, and especially
691     log the case when more than half of the nodes fails.
692
693     @param result: the data as returned from the rpc call
694     @type nodes: list
695     @param nodes: the list of nodes we made the call to
696     @type failmsg: str
697     @param failmsg: the identifier to be used for logging
698
699     """
700     failed = []
701     success = []
702
703     for node in nodes:
704       if result[node]:
705         success.append(node)
706       else:
707         failed.append(node)
708
709     if failed:
710       logging.error("%s failed on %s", failmsg, ", ".join(failed))
711
712     # +1 for the master node
713     if (len(success) + 1) < len(failed):
714       # TODO: Handle failing nodes
715       logging.error("More than half of the nodes failed")
716
717   def _GetNodeIp(self):
718     """Helper for returning the node name/ip list.
719
720     @rtype: (list, list)
721     @return: a tuple of two lists, the first one with the node
722         names and the second one with the node addresses
723
724     """
725     name_list = self._nodes.keys()
726     addr_list = [self._nodes[name] for name in name_list]
727     return name_list, addr_list
728
729   def _WriteAndReplicateFileUnlocked(self, file_name, data):
730     """Writes a file locally and then replicates it to all nodes.
731
732     This function will replace the contents of a file on the local
733     node and then replicate it to all the other nodes we have.
734
735     @type file_name: str
736     @param file_name: the path of the file to be replicated
737     @type data: str
738     @param data: the new contents of the file
739
740     """
741     utils.WriteFile(file_name, data=data)
742
743     names, addrs = self._GetNodeIp()
744     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
745     self._CheckRpcResult(result, self._nodes,
746                          "Updating %s" % file_name)
747
748   def _RenameFilesUnlocked(self, rename):
749     """Renames a file locally and then replicate the change.
750
751     This function will rename a file in the local queue directory
752     and then replicate this rename to all the other nodes we have.
753
754     @type rename: list of (old, new)
755     @param rename: List containing tuples mapping old to new names
756
757     """
758     # Rename them locally
759     for old, new in rename:
760       utils.RenameFile(old, new, mkdir=True)
761
762     # ... and on all nodes
763     names, addrs = self._GetNodeIp()
764     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
765     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
766
767   def _FormatJobID(self, job_id):
768     """Convert a job ID to string format.
769
770     Currently this just does C{str(job_id)} after performing some
771     checks, but if we want to change the job id format this will
772     abstract this change.
773
774     @type job_id: int or long
775     @param job_id: the numeric job id
776     @rtype: str
777     @return: the formatted job id
778
779     """
780     if not isinstance(job_id, (int, long)):
781       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
782     if job_id < 0:
783       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
784
785     return str(job_id)
786
787   @classmethod
788   def _GetArchiveDirectory(cls, job_id):
789     """Returns the archive directory for a job.
790
791     @type job_id: str
792     @param job_id: Job identifier
793     @rtype: str
794     @return: Directory name
795
796     """
797     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
798
799   def _NewSerialUnlocked(self):
800     """Generates a new job identifier.
801
802     Job identifiers are unique during the lifetime of a cluster.
803
804     @rtype: str
805     @return: a string representing the job identifier.
806
807     """
808     # New number
809     serial = self._last_serial + 1
810
811     # Write to file
812     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
813                                         "%s\n" % serial)
814
815     # Keep it only if we were able to write the file
816     self._last_serial = serial
817
818     return self._FormatJobID(serial)
819
820   @staticmethod
821   def _GetJobPath(job_id):
822     """Returns the job file for a given job id.
823
824     @type job_id: str
825     @param job_id: the job identifier
826     @rtype: str
827     @return: the path to the job file
828
829     """
830     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
831
832   @classmethod
833   def _GetArchivedJobPath(cls, job_id):
834     """Returns the archived job file for a give job id.
835
836     @type job_id: str
837     @param job_id: the job identifier
838     @rtype: str
839     @return: the path to the archived job file
840
841     """
842     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
843     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
844
845   @classmethod
846   def _ExtractJobID(cls, name):
847     """Extract the job id from a filename.
848
849     @type name: str
850     @param name: the job filename
851     @rtype: job id or None
852     @return: the job id corresponding to the given filename,
853         or None if the filename does not represent a valid
854         job file
855
856     """
857     m = cls._RE_JOB_FILE.match(name)
858     if m:
859       return m.group(1)
860     else:
861       return None
862
863   def _GetJobIDsUnlocked(self, archived=False):
864     """Return all known job IDs.
865
866     If the parameter archived is True, archived jobs IDs will be
867     included. Currently this argument is unused.
868
869     The method only looks at disk because it's a requirement that all
870     jobs are present on disk (so in the _memcache we don't have any
871     extra IDs).
872
873     @rtype: list
874     @return: the list of job IDs
875
876     """
877     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
878     jlist = utils.NiceSort(jlist)
879     return jlist
880
881   def _ListJobFiles(self):
882     """Returns the list of current job files.
883
884     @rtype: list
885     @return: the list of job file names
886
887     """
888     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
889             if self._RE_JOB_FILE.match(name)]
890
891   def _LoadJobUnlocked(self, job_id):
892     """Loads a job from the disk or memory.
893
894     Given a job id, this will return the cached job object if
895     existing, or try to load the job from the disk. If loading from
896     disk, it will also add the job to the cache.
897
898     @param job_id: the job id
899     @rtype: L{_QueuedJob} or None
900     @return: either None or the job object
901
902     """
903     job = self._memcache.get(job_id, None)
904     if job:
905       logging.debug("Found job %s in memcache", job_id)
906       return job
907
908     filepath = self._GetJobPath(job_id)
909     logging.debug("Loading job from %s", filepath)
910     try:
911       fd = open(filepath, "r")
912     except IOError, err:
913       if err.errno in (errno.ENOENT, ):
914         return None
915       raise
916     try:
917       data = serializer.LoadJson(fd.read())
918     finally:
919       fd.close()
920
921     try:
922       job = _QueuedJob.Restore(self, data)
923     except Exception, err:
924       new_path = self._GetArchivedJobPath(job_id)
925       if filepath == new_path:
926         # job already archived (future case)
927         logging.exception("Can't parse job %s", job_id)
928       else:
929         # non-archived case
930         logging.exception("Can't parse job %s, will archive.", job_id)
931         self._RenameFilesUnlocked([(filepath, new_path)])
932       return None
933
934     self._memcache[job_id] = job
935     logging.debug("Added job %s to the cache", job_id)
936     return job
937
938   def _GetJobsUnlocked(self, job_ids):
939     """Return a list of jobs based on their IDs.
940
941     @type job_ids: list
942     @param job_ids: either an empty list (meaning all jobs),
943         or a list of job IDs
944     @rtype: list
945     @return: the list of job objects
946
947     """
948     if not job_ids:
949       job_ids = self._GetJobIDsUnlocked()
950
951     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
952
953   @staticmethod
954   def _IsQueueMarkedDrain():
955     """Check if the queue is marked from drain.
956
957     This currently uses the queue drain file, which makes it a
958     per-node flag. In the future this can be moved to the config file.
959
960     @rtype: boolean
961     @return: True of the job queue is marked for draining
962
963     """
964     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
965
966   @staticmethod
967   def SetDrainFlag(drain_flag):
968     """Sets the drain flag for the queue.
969
970     This is similar to the function L{backend.JobQueueSetDrainFlag},
971     and in the future we might merge them.
972
973     @type drain_flag: boolean
974     @param drain_flag: Whether to set or unset the drain flag
975
976     """
977     if drain_flag:
978       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
979     else:
980       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
981     return True
982
983   @_RequireOpenQueue
984   def _SubmitJobUnlocked(self, ops):
985     """Create and store a new job.
986
987     This enters the job into our job queue and also puts it on the new
988     queue, in order for it to be picked up by the queue processors.
989
990     @type ops: list
991     @param ops: The list of OpCodes that will become the new job.
992     @rtype: job ID
993     @return: the job ID of the newly created job
994     @raise errors.JobQueueDrainError: if the job is marked for draining
995
996     """
997     if self._IsQueueMarkedDrain():
998       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
999
1000     # Check job queue size
1001     size = len(self._ListJobFiles())
1002     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1003       # TODO: Autoarchive jobs. Make sure it's not done on every job
1004       # submission, though.
1005       #size = ...
1006       pass
1007
1008     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1009       raise errors.JobQueueFull()
1010
1011     # Get job identifier
1012     job_id = self._NewSerialUnlocked()
1013     job = _QueuedJob(self, job_id, ops)
1014
1015     # Write to disk
1016     self.UpdateJobUnlocked(job)
1017
1018     logging.debug("Adding new job %s to the cache", job_id)
1019     self._memcache[job_id] = job
1020
1021     # Add to worker pool
1022     self._wpool.AddTask(job)
1023
1024     return job.id
1025
1026   @utils.LockedMethod
1027   @_RequireOpenQueue
1028   def SubmitJob(self, ops):
1029     """Create and store a new job.
1030
1031     @see: L{_SubmitJobUnlocked}
1032
1033     """
1034     return self._SubmitJobUnlocked(ops)
1035
1036   @utils.LockedMethod
1037   @_RequireOpenQueue
1038   def SubmitManyJobs(self, jobs):
1039     """Create and store multiple jobs.
1040
1041     @see: L{_SubmitJobUnlocked}
1042
1043     """
1044     results = []
1045     for ops in jobs:
1046       try:
1047         data = self._SubmitJobUnlocked(ops)
1048         status = True
1049       except errors.GenericError, err:
1050         data = str(err)
1051         status = False
1052       results.append((status, data))
1053
1054     return results
1055
1056
1057   @_RequireOpenQueue
1058   def UpdateJobUnlocked(self, job):
1059     """Update a job's on disk storage.
1060
1061     After a job has been modified, this function needs to be called in
1062     order to write the changes to disk and replicate them to the other
1063     nodes.
1064
1065     @type job: L{_QueuedJob}
1066     @param job: the changed job
1067
1068     """
1069     filename = self._GetJobPath(job.id)
1070     data = serializer.DumpJson(job.Serialize(), indent=False)
1071     logging.debug("Writing job %s to %s", job.id, filename)
1072     self._WriteAndReplicateFileUnlocked(filename, data)
1073
1074     # Notify waiters about potential changes
1075     job.change.notifyAll()
1076
1077   @utils.LockedMethod
1078   @_RequireOpenQueue
1079   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1080                         timeout):
1081     """Waits for changes in a job.
1082
1083     @type job_id: string
1084     @param job_id: Job identifier
1085     @type fields: list of strings
1086     @param fields: Which fields to check for changes
1087     @type prev_job_info: list or None
1088     @param prev_job_info: Last job information returned
1089     @type prev_log_serial: int
1090     @param prev_log_serial: Last job message serial number
1091     @type timeout: float
1092     @param timeout: maximum time to wait
1093     @rtype: tuple (job info, log entries)
1094     @return: a tuple of the job information as required via
1095         the fields parameter, and the log entries as a list
1096
1097         if the job has not changed and the timeout has expired,
1098         we instead return a special value,
1099         L{constants.JOB_NOTCHANGED}, which should be interpreted
1100         as such by the clients
1101
1102     """
1103     logging.debug("Waiting for changes in job %s", job_id)
1104
1105     job_info = None
1106     log_entries = None
1107
1108     end_time = time.time() + timeout
1109     while True:
1110       delta_time = end_time - time.time()
1111       if delta_time < 0:
1112         return constants.JOB_NOTCHANGED
1113
1114       job = self._LoadJobUnlocked(job_id)
1115       if not job:
1116         logging.debug("Job %s not found", job_id)
1117         break
1118
1119       status = job.CalcStatus()
1120       job_info = self._GetJobInfoUnlocked(job, fields)
1121       log_entries = job.GetLogEntries(prev_log_serial)
1122
1123       # Serializing and deserializing data can cause type changes (e.g. from
1124       # tuple to list) or precision loss. We're doing it here so that we get
1125       # the same modifications as the data received from the client. Without
1126       # this, the comparison afterwards might fail without the data being
1127       # significantly different.
1128       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1129       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1130
1131       if status not in (constants.JOB_STATUS_QUEUED,
1132                         constants.JOB_STATUS_RUNNING,
1133                         constants.JOB_STATUS_WAITLOCK):
1134         # Don't even try to wait if the job is no longer running, there will be
1135         # no changes.
1136         break
1137
1138       if (prev_job_info != job_info or
1139           (log_entries and prev_log_serial != log_entries[0][0])):
1140         break
1141
1142       logging.debug("Waiting again")
1143
1144       # Release the queue lock while waiting
1145       job.change.wait(delta_time)
1146
1147     logging.debug("Job %s changed", job_id)
1148
1149     if job_info is None and log_entries is None:
1150       return None
1151     else:
1152       return (job_info, log_entries)
1153
1154   @utils.LockedMethod
1155   @_RequireOpenQueue
1156   def CancelJob(self, job_id):
1157     """Cancels a job.
1158
1159     This will only succeed if the job has not started yet.
1160
1161     @type job_id: string
1162     @param job_id: job ID of job to be cancelled.
1163
1164     """
1165     logging.info("Cancelling job %s", job_id)
1166
1167     job = self._LoadJobUnlocked(job_id)
1168     if not job:
1169       logging.debug("Job %s not found", job_id)
1170       return (False, "Job %s not found" % job_id)
1171
1172     job_status = job.CalcStatus()
1173
1174     if job_status not in (constants.JOB_STATUS_QUEUED,
1175                           constants.JOB_STATUS_WAITLOCK):
1176       logging.debug("Job %s is no longer waiting in the queue", job.id)
1177       return (False, "Job %s is no longer waiting in the queue" % job.id)
1178
1179     if job_status == constants.JOB_STATUS_QUEUED:
1180       self.CancelJobUnlocked(job)
1181       return (True, "Job %s canceled" % job.id)
1182
1183     elif job_status == constants.JOB_STATUS_WAITLOCK:
1184       # The worker will notice the new status and cancel the job
1185       try:
1186         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1187       finally:
1188         self.UpdateJobUnlocked(job)
1189       return (True, "Job %s will be canceled" % job.id)
1190
1191   @_RequireOpenQueue
1192   def CancelJobUnlocked(self, job):
1193     """Marks a job as canceled.
1194
1195     """
1196     try:
1197       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1198                             "Job canceled by request")
1199     finally:
1200       self.UpdateJobUnlocked(job)
1201
1202   @_RequireOpenQueue
1203   def _ArchiveJobsUnlocked(self, jobs):
1204     """Archives jobs.
1205
1206     @type jobs: list of L{_QueuedJob}
1207     @param jobs: Job objects
1208     @rtype: int
1209     @return: Number of archived jobs
1210
1211     """
1212     archive_jobs = []
1213     rename_files = []
1214     for job in jobs:
1215       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1216                                   constants.JOB_STATUS_SUCCESS,
1217                                   constants.JOB_STATUS_ERROR):
1218         logging.debug("Job %s is not yet done", job.id)
1219         continue
1220
1221       archive_jobs.append(job)
1222
1223       old = self._GetJobPath(job.id)
1224       new = self._GetArchivedJobPath(job.id)
1225       rename_files.append((old, new))
1226
1227     # TODO: What if 1..n files fail to rename?
1228     self._RenameFilesUnlocked(rename_files)
1229
1230     logging.debug("Successfully archived job(s) %s",
1231                   ", ".join(job.id for job in archive_jobs))
1232
1233     return len(archive_jobs)
1234
1235   @utils.LockedMethod
1236   @_RequireOpenQueue
1237   def ArchiveJob(self, job_id):
1238     """Archives a job.
1239
1240     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1241
1242     @type job_id: string
1243     @param job_id: Job ID of job to be archived.
1244     @rtype: bool
1245     @return: Whether job was archived
1246
1247     """
1248     logging.info("Archiving job %s", job_id)
1249
1250     job = self._LoadJobUnlocked(job_id)
1251     if not job:
1252       logging.debug("Job %s not found", job_id)
1253       return False
1254
1255     return self._ArchiveJobsUnlocked([job]) == 1
1256
1257   @utils.LockedMethod
1258   @_RequireOpenQueue
1259   def AutoArchiveJobs(self, age, timeout):
1260     """Archives all jobs based on age.
1261
1262     The method will archive all jobs which are older than the age
1263     parameter. For jobs that don't have an end timestamp, the start
1264     timestamp will be considered. The special '-1' age will cause
1265     archival of all jobs (that are not running or queued).
1266
1267     @type age: int
1268     @param age: the minimum age in seconds
1269
1270     """
1271     logging.info("Archiving jobs with age more than %s seconds", age)
1272
1273     now = time.time()
1274     end_time = now + timeout
1275     archived_count = 0
1276     last_touched = 0
1277
1278     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1279     pending = []
1280     for idx, job_id in enumerate(all_job_ids):
1281       last_touched = idx
1282
1283       # Not optimal because jobs could be pending
1284       # TODO: Measure average duration for job archival and take number of
1285       # pending jobs into account.
1286       if time.time() > end_time:
1287         break
1288
1289       # Returns None if the job failed to load
1290       job = self._LoadJobUnlocked(job_id)
1291       if job:
1292         if job.end_timestamp is None:
1293           if job.start_timestamp is None:
1294             job_age = job.received_timestamp
1295           else:
1296             job_age = job.start_timestamp
1297         else:
1298           job_age = job.end_timestamp
1299
1300         if age == -1 or now - job_age[0] > age:
1301           pending.append(job)
1302
1303           # Archive 10 jobs at a time
1304           if len(pending) >= 10:
1305             archived_count += self._ArchiveJobsUnlocked(pending)
1306             pending = []
1307
1308     if pending:
1309       archived_count += self._ArchiveJobsUnlocked(pending)
1310
1311     return (archived_count, len(all_job_ids) - last_touched - 1)
1312
1313   def _GetJobInfoUnlocked(self, job, fields):
1314     """Returns information about a job.
1315
1316     @type job: L{_QueuedJob}
1317     @param job: the job which we query
1318     @type fields: list
1319     @param fields: names of fields to return
1320     @rtype: list
1321     @return: list with one element for each field
1322     @raise errors.OpExecError: when an invalid field
1323         has been passed
1324
1325     """
1326     row = []
1327     for fname in fields:
1328       if fname == "id":
1329         row.append(job.id)
1330       elif fname == "status":
1331         row.append(job.CalcStatus())
1332       elif fname == "ops":
1333         row.append([op.input.__getstate__() for op in job.ops])
1334       elif fname == "opresult":
1335         row.append([op.result for op in job.ops])
1336       elif fname == "opstatus":
1337         row.append([op.status for op in job.ops])
1338       elif fname == "oplog":
1339         row.append([op.log for op in job.ops])
1340       elif fname == "opstart":
1341         row.append([op.start_timestamp for op in job.ops])
1342       elif fname == "opend":
1343         row.append([op.end_timestamp for op in job.ops])
1344       elif fname == "received_ts":
1345         row.append(job.received_timestamp)
1346       elif fname == "start_ts":
1347         row.append(job.start_timestamp)
1348       elif fname == "end_ts":
1349         row.append(job.end_timestamp)
1350       elif fname == "summary":
1351         row.append([op.input.Summary() for op in job.ops])
1352       else:
1353         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1354     return row
1355
1356   @utils.LockedMethod
1357   @_RequireOpenQueue
1358   def QueryJobs(self, job_ids, fields):
1359     """Returns a list of jobs in queue.
1360
1361     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1362     processing for each job.
1363
1364     @type job_ids: list
1365     @param job_ids: sequence of job identifiers or None for all
1366     @type fields: list
1367     @param fields: names of fields to return
1368     @rtype: list
1369     @return: list one element per job, each element being list with
1370         the requested fields
1371
1372     """
1373     jobs = []
1374
1375     for job in self._GetJobsUnlocked(job_ids):
1376       if job is None:
1377         jobs.append(None)
1378       else:
1379         jobs.append(self._GetJobInfoUnlocked(job, fields))
1380
1381     return jobs
1382
1383   @utils.LockedMethod
1384   @_RequireOpenQueue
1385   def Shutdown(self):
1386     """Stops the job queue.
1387
1388     This shutdowns all the worker threads an closes the queue.
1389
1390     """
1391     self._wpool.TerminateWorkers()
1392
1393     self._queue_lock.Close()
1394     self._queue_lock = None