Fix another issue with hypervisor_name change
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   def __init__(self, op):
84     """Constructor for the _QuededOpCode.
85
86     @type op: L{opcodes.OpCode}
87     @param op: the opcode we encapsulate
88
89     """
90     self.input = op
91     self.status = constants.OP_STATUS_QUEUED
92     self.result = None
93     self.log = []
94     self.start_timestamp = None
95     self.end_timestamp = None
96
97   @classmethod
98   def Restore(cls, state):
99     """Restore the _QueuedOpCode from the serialized form.
100
101     @type state: dict
102     @param state: the serialized state
103     @rtype: _QueuedOpCode
104     @return: a new _QueuedOpCode instance
105
106     """
107     obj = _QueuedOpCode.__new__(cls)
108     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109     obj.status = state["status"]
110     obj.result = state["result"]
111     obj.log = state["log"]
112     obj.start_timestamp = state.get("start_timestamp", None)
113     obj.end_timestamp = state.get("end_timestamp", None)
114     return obj
115
116   def Serialize(self):
117     """Serializes this _QueuedOpCode.
118
119     @rtype: dict
120     @return: the dictionary holding the serialized state
121
122     """
123     return {
124       "input": self.input.__getstate__(),
125       "status": self.status,
126       "result": self.result,
127       "log": self.log,
128       "start_timestamp": self.start_timestamp,
129       "end_timestamp": self.end_timestamp,
130       }
131
132
133 class _QueuedJob(object):
134   """In-memory job representation.
135
136   This is what we use to track the user-submitted jobs. Locking must
137   be taken care of by users of this class.
138
139   @type queue: L{JobQueue}
140   @ivar queue: the parent queue
141   @ivar id: the job ID
142   @type ops: list
143   @ivar ops: the list of _QueuedOpCode that constitute the job
144   @type run_op_index: int
145   @ivar run_op_index: the currently executing opcode, or -1 if
146       we didn't yet start executing
147   @type log_serial: int
148   @ivar log_serial: holds the index for the next log entry
149   @ivar received_timestamp: the timestamp for when the job was received
150   @ivar start_timestmap: the timestamp for start of execution
151   @ivar end_timestamp: the timestamp for end of execution
152   @ivar change: a Condition variable we use for waiting for job changes
153
154   """
155   def __init__(self, queue, job_id, ops):
156     """Constructor for the _QueuedJob.
157
158     @type queue: L{JobQueue}
159     @param queue: our parent queue
160     @type job_id: job_id
161     @param job_id: our job id
162     @type ops: list
163     @param ops: the list of opcodes we hold, which will be encapsulated
164         in _QueuedOpCodes
165
166     """
167     if not ops:
168       # TODO: use a better exception
169       raise Exception("No opcodes")
170
171     self.queue = queue
172     self.id = job_id
173     self.ops = [_QueuedOpCode(op) for op in ops]
174     self.run_op_index = -1
175     self.log_serial = 0
176     self.received_timestamp = TimeStampNow()
177     self.start_timestamp = None
178     self.end_timestamp = None
179
180     # Condition to wait for changes
181     self.change = threading.Condition(self.queue._lock)
182
183   @classmethod
184   def Restore(cls, queue, state):
185     """Restore a _QueuedJob from serialized state:
186
187     @type queue: L{JobQueue}
188     @param queue: to which queue the restored job belongs
189     @type state: dict
190     @param state: the serialized state
191     @rtype: _JobQueue
192     @return: the restored _JobQueue instance
193
194     """
195     obj = _QueuedJob.__new__(cls)
196     obj.queue = queue
197     obj.id = state["id"]
198     obj.run_op_index = state["run_op_index"]
199     obj.received_timestamp = state.get("received_timestamp", None)
200     obj.start_timestamp = state.get("start_timestamp", None)
201     obj.end_timestamp = state.get("end_timestamp", None)
202
203     obj.ops = []
204     obj.log_serial = 0
205     for op_state in state["ops"]:
206       op = _QueuedOpCode.Restore(op_state)
207       for log_entry in op.log:
208         obj.log_serial = max(obj.log_serial, log_entry[0])
209       obj.ops.append(op)
210
211     # Condition to wait for changes
212     obj.change = threading.Condition(obj.queue._lock)
213
214     return obj
215
216   def Serialize(self):
217     """Serialize the _JobQueue instance.
218
219     @rtype: dict
220     @return: the serialized state
221
222     """
223     return {
224       "id": self.id,
225       "ops": [op.Serialize() for op in self.ops],
226       "run_op_index": self.run_op_index,
227       "start_timestamp": self.start_timestamp,
228       "end_timestamp": self.end_timestamp,
229       "received_timestamp": self.received_timestamp,
230       }
231
232   def CalcStatus(self):
233     """Compute the status of this job.
234
235     This function iterates over all the _QueuedOpCodes in the job and
236     based on their status, computes the job status.
237
238     The algorithm is:
239       - if we find a cancelled, or finished with error, the job
240         status will be the same
241       - otherwise, the last opcode with the status one of:
242           - waitlock
243           - canceling
244           - running
245
246         will determine the job status
247
248       - otherwise, it means either all opcodes are queued, or success,
249         and the job status will be the same
250
251     @return: the job status
252
253     """
254     status = constants.JOB_STATUS_QUEUED
255
256     all_success = True
257     for op in self.ops:
258       if op.status == constants.OP_STATUS_SUCCESS:
259         continue
260
261       all_success = False
262
263       if op.status == constants.OP_STATUS_QUEUED:
264         pass
265       elif op.status == constants.OP_STATUS_WAITLOCK:
266         status = constants.JOB_STATUS_WAITLOCK
267       elif op.status == constants.OP_STATUS_RUNNING:
268         status = constants.JOB_STATUS_RUNNING
269       elif op.status == constants.OP_STATUS_CANCELING:
270         status = constants.JOB_STATUS_CANCELING
271         break
272       elif op.status == constants.OP_STATUS_ERROR:
273         status = constants.JOB_STATUS_ERROR
274         # The whole job fails if one opcode failed
275         break
276       elif op.status == constants.OP_STATUS_CANCELED:
277         status = constants.OP_STATUS_CANCELED
278         break
279
280     if all_success:
281       status = constants.JOB_STATUS_SUCCESS
282
283     return status
284
285   def GetLogEntries(self, newer_than):
286     """Selectively returns the log entries.
287
288     @type newer_than: None or int
289     @param newer_than: if this is None, return all log entries,
290         otherwise return only the log entries with serial higher
291         than this value
292     @rtype: list
293     @return: the list of the log entries selected
294
295     """
296     if newer_than is None:
297       serial = -1
298     else:
299       serial = newer_than
300
301     entries = []
302     for op in self.ops:
303       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304
305     return entries
306
307
308 class _JobQueueWorker(workerpool.BaseWorker):
309   """The actual job workers.
310
311   """
312   def _NotifyStart(self):
313     """Mark the opcode as running, not lock-waiting.
314
315     This is called from the mcpu code as a notifier function, when the
316     LU is finally about to start the Exec() method. Of course, to have
317     end-user visible results, the opcode must be initially (before
318     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
319
320     """
321     assert self.queue, "Queue attribute is missing"
322     assert self.opcode, "Opcode attribute is missing"
323
324     self.queue.acquire()
325     try:
326       assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327                                     constants.OP_STATUS_CANCELING)
328
329       # Cancel here if we were asked to
330       if self.opcode.status == constants.OP_STATUS_CANCELING:
331         raise CancelJob()
332
333       self.opcode.status = constants.OP_STATUS_RUNNING
334     finally:
335       self.queue.release()
336
337   def RunTask(self, job):
338     """Job executor.
339
340     This functions processes a job. It is closely tied to the _QueuedJob and
341     _QueuedOpCode classes.
342
343     @type job: L{_QueuedJob}
344     @param job: the job to be processed
345
346     """
347     logging.info("Worker %s processing job %s",
348                   self.worker_id, job.id)
349     proc = mcpu.Processor(self.pool.queue.context)
350     self.queue = queue = job.queue
351     try:
352       try:
353         count = len(job.ops)
354         for idx, op in enumerate(job.ops):
355           op_summary = op.input.Summary()
356           try:
357             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
358                          op_summary)
359
360             queue.acquire()
361             try:
362               if op.status == constants.OP_STATUS_CANCELED:
363                 raise CancelJob()
364               assert op.status == constants.OP_STATUS_QUEUED
365               job.run_op_index = idx
366               op.status = constants.OP_STATUS_WAITLOCK
367               op.result = None
368               op.start_timestamp = TimeStampNow()
369               if idx == 0: # first opcode
370                 job.start_timestamp = op.start_timestamp
371               queue.UpdateJobUnlocked(job)
372
373               input_opcode = op.input
374             finally:
375               queue.release()
376
377             def _Log(*args):
378               """Append a log entry.
379
380               """
381               assert len(args) < 3
382
383               if len(args) == 1:
384                 log_type = constants.ELOG_MESSAGE
385                 log_msg = args[0]
386               else:
387                 log_type, log_msg = args
388
389               # The time is split to make serialization easier and not lose
390               # precision.
391               timestamp = utils.SplitTime(time.time())
392
393               queue.acquire()
394               try:
395                 job.log_serial += 1
396                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
397
398                 job.change.notifyAll()
399               finally:
400                 queue.release()
401
402             # Make sure not to hold lock while _Log is called
403             self.opcode = op
404             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
405
406             queue.acquire()
407             try:
408               op.status = constants.OP_STATUS_SUCCESS
409               op.result = result
410               op.end_timestamp = TimeStampNow()
411               queue.UpdateJobUnlocked(job)
412             finally:
413               queue.release()
414
415             logging.info("Op %s/%s: Successfully finished opcode %s",
416                          idx + 1, count, op_summary)
417           except CancelJob:
418             # Will be handled further up
419             raise
420           except Exception, err:
421             queue.acquire()
422             try:
423               try:
424                 op.status = constants.OP_STATUS_ERROR
425                 op.result = str(err)
426                 op.end_timestamp = TimeStampNow()
427                 logging.info("Op %s/%s: Error in opcode %s: %s",
428                              idx + 1, count, op_summary, err)
429               finally:
430                 queue.UpdateJobUnlocked(job)
431             finally:
432               queue.release()
433             raise
434
435       except CancelJob:
436         queue.acquire()
437         try:
438           queue.CancelJobUnlocked(job)
439         finally:
440           queue.release()
441       except errors.GenericError, err:
442         logging.exception("Ganeti exception")
443       except:
444         logging.exception("Unhandled exception")
445     finally:
446       queue.acquire()
447       try:
448         try:
449           job.run_op_idx = -1
450           job.end_timestamp = TimeStampNow()
451           queue.UpdateJobUnlocked(job)
452         finally:
453           job_id = job.id
454           status = job.CalcStatus()
455       finally:
456         queue.release()
457       logging.info("Worker %s finished job %s, status = %s",
458                    self.worker_id, job_id, status)
459
460
461 class _JobQueueWorkerPool(workerpool.WorkerPool):
462   """Simple class implementing a job-processing workerpool.
463
464   """
465   def __init__(self, queue):
466     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
467                                               _JobQueueWorker)
468     self.queue = queue
469
470
471 class JobQueue(object):
472   """Queue used to manage the jobs.
473
474   @cvar _RE_JOB_FILE: regex matching the valid job file names
475
476   """
477   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
478
479   def _RequireOpenQueue(fn):
480     """Decorator for "public" functions.
481
482     This function should be used for all 'public' functions. That is,
483     functions usually called from other classes.
484
485     @warning: Use this decorator only after utils.LockedMethod!
486
487     Example::
488       @utils.LockedMethod
489       @_RequireOpenQueue
490       def Example(self):
491         pass
492
493     """
494     def wrapper(self, *args, **kwargs):
495       assert self._queue_lock is not None, "Queue should be open"
496       return fn(self, *args, **kwargs)
497     return wrapper
498
499   def __init__(self, context):
500     """Constructor for JobQueue.
501
502     The constructor will initialize the job queue object and then
503     start loading the current jobs from disk, either for starting them
504     (if they were queue) or for aborting them (if they were already
505     running).
506
507     @type context: GanetiContext
508     @param context: the context object for access to the configuration
509         data and other ganeti objects
510
511     """
512     self.context = context
513     self._memcache = weakref.WeakValueDictionary()
514     self._my_hostname = utils.HostInfo().name
515
516     # Locking
517     self._lock = threading.Lock()
518     self.acquire = self._lock.acquire
519     self.release = self._lock.release
520
521     # Initialize
522     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
523
524     # Read serial file
525     self._last_serial = jstore.ReadSerial()
526     assert self._last_serial is not None, ("Serial file was modified between"
527                                            " check in jstore and here")
528
529     # Get initial list of nodes
530     self._nodes = dict((n.name, n.primary_ip)
531                        for n in self.context.cfg.GetAllNodesInfo().values()
532                        if n.master_candidate)
533
534     # Remove master node
535     try:
536       del self._nodes[self._my_hostname]
537     except KeyError:
538       pass
539
540     # TODO: Check consistency across nodes
541
542     # Setup worker pool
543     self._wpool = _JobQueueWorkerPool(self)
544     try:
545       # We need to lock here because WorkerPool.AddTask() may start a job while
546       # we're still doing our work.
547       self.acquire()
548       try:
549         logging.info("Inspecting job queue")
550
551         all_job_ids = self._GetJobIDsUnlocked()
552         jobs_count = len(all_job_ids)
553         lastinfo = time.time()
554         for idx, job_id in enumerate(all_job_ids):
555           # Give an update every 1000 jobs or 10 seconds
556           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
557               idx == (jobs_count - 1)):
558             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
559                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
560             lastinfo = time.time()
561
562           job = self._LoadJobUnlocked(job_id)
563
564           # a failure in loading the job can cause 'None' to be returned
565           if job is None:
566             continue
567
568           status = job.CalcStatus()
569
570           if status in (constants.JOB_STATUS_QUEUED, ):
571             self._wpool.AddTask(job)
572
573           elif status in (constants.JOB_STATUS_RUNNING,
574                           constants.JOB_STATUS_WAITLOCK,
575                           constants.JOB_STATUS_CANCELING):
576             logging.warning("Unfinished job %s found: %s", job.id, job)
577             try:
578               for op in job.ops:
579                 op.status = constants.OP_STATUS_ERROR
580                 op.result = "Unclean master daemon shutdown"
581             finally:
582               self.UpdateJobUnlocked(job)
583
584         logging.info("Job queue inspection finished")
585       finally:
586         self.release()
587     except:
588       self._wpool.TerminateWorkers()
589       raise
590
591   @utils.LockedMethod
592   @_RequireOpenQueue
593   def AddNode(self, node):
594     """Register a new node with the queue.
595
596     @type node: L{objects.Node}
597     @param node: the node object to be added
598
599     """
600     node_name = node.name
601     assert node_name != self._my_hostname
602
603     # Clean queue directory on added node
604     rpc.RpcRunner.call_jobqueue_purge(node_name)
605
606     if not node.master_candidate:
607       # remove if existing, ignoring errors
608       self._nodes.pop(node_name, None)
609       # and skip the replication of the job ids
610       return
611
612     # Upload the whole queue excluding archived jobs
613     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
614
615     # Upload current serial file
616     files.append(constants.JOB_QUEUE_SERIAL_FILE)
617
618     for file_name in files:
619       # Read file content
620       fd = open(file_name, "r")
621       try:
622         content = fd.read()
623       finally:
624         fd.close()
625
626       result = rpc.RpcRunner.call_jobqueue_update([node_name],
627                                                   [node.primary_ip],
628                                                   file_name, content)
629       if not result[node_name]:
630         logging.error("Failed to upload %s to %s", file_name, node_name)
631
632     self._nodes[node_name] = node.primary_ip
633
634   @utils.LockedMethod
635   @_RequireOpenQueue
636   def RemoveNode(self, node_name):
637     """Callback called when removing nodes from the cluster.
638
639     @type node_name: str
640     @param node_name: the name of the node to remove
641
642     """
643     try:
644       # The queue is removed by the "leave node" RPC call.
645       del self._nodes[node_name]
646     except KeyError:
647       pass
648
649   def _CheckRpcResult(self, result, nodes, failmsg):
650     """Verifies the status of an RPC call.
651
652     Since we aim to keep consistency should this node (the current
653     master) fail, we will log errors if our rpc fail, and especially
654     log the case when more than half of the nodes fails.
655
656     @param result: the data as returned from the rpc call
657     @type nodes: list
658     @param nodes: the list of nodes we made the call to
659     @type failmsg: str
660     @param failmsg: the identifier to be used for logging
661
662     """
663     failed = []
664     success = []
665
666     for node in nodes:
667       if result[node]:
668         success.append(node)
669       else:
670         failed.append(node)
671
672     if failed:
673       logging.error("%s failed on %s", failmsg, ", ".join(failed))
674
675     # +1 for the master node
676     if (len(success) + 1) < len(failed):
677       # TODO: Handle failing nodes
678       logging.error("More than half of the nodes failed")
679
680   def _GetNodeIp(self):
681     """Helper for returning the node name/ip list.
682
683     @rtype: (list, list)
684     @return: a tuple of two lists, the first one with the node
685         names and the second one with the node addresses
686
687     """
688     name_list = self._nodes.keys()
689     addr_list = [self._nodes[name] for name in name_list]
690     return name_list, addr_list
691
692   def _WriteAndReplicateFileUnlocked(self, file_name, data):
693     """Writes a file locally and then replicates it to all nodes.
694
695     This function will replace the contents of a file on the local
696     node and then replicate it to all the other nodes we have.
697
698     @type file_name: str
699     @param file_name: the path of the file to be replicated
700     @type data: str
701     @param data: the new contents of the file
702
703     """
704     utils.WriteFile(file_name, data=data)
705
706     names, addrs = self._GetNodeIp()
707     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
708     self._CheckRpcResult(result, self._nodes,
709                          "Updating %s" % file_name)
710
711   def _RenameFilesUnlocked(self, rename):
712     """Renames a file locally and then replicate the change.
713
714     This function will rename a file in the local queue directory
715     and then replicate this rename to all the other nodes we have.
716
717     @type rename: list of (old, new)
718     @param rename: List containing tuples mapping old to new names
719
720     """
721     # Rename them locally
722     for old, new in rename:
723       utils.RenameFile(old, new, mkdir=True)
724
725     # ... and on all nodes
726     names, addrs = self._GetNodeIp()
727     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
728     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
729
730   def _FormatJobID(self, job_id):
731     """Convert a job ID to string format.
732
733     Currently this just does C{str(job_id)} after performing some
734     checks, but if we want to change the job id format this will
735     abstract this change.
736
737     @type job_id: int or long
738     @param job_id: the numeric job id
739     @rtype: str
740     @return: the formatted job id
741
742     """
743     if not isinstance(job_id, (int, long)):
744       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
745     if job_id < 0:
746       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
747
748     return str(job_id)
749
750   @classmethod
751   def _GetArchiveDirectory(cls, job_id):
752     """Returns the archive directory for a job.
753
754     @type job_id: str
755     @param job_id: Job identifier
756     @rtype: str
757     @return: Directory name
758
759     """
760     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
761
762   def _NewSerialUnlocked(self):
763     """Generates a new job identifier.
764
765     Job identifiers are unique during the lifetime of a cluster.
766
767     @rtype: str
768     @return: a string representing the job identifier.
769
770     """
771     # New number
772     serial = self._last_serial + 1
773
774     # Write to file
775     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
776                                         "%s\n" % serial)
777
778     # Keep it only if we were able to write the file
779     self._last_serial = serial
780
781     return self._FormatJobID(serial)
782
783   @staticmethod
784   def _GetJobPath(job_id):
785     """Returns the job file for a given job id.
786
787     @type job_id: str
788     @param job_id: the job identifier
789     @rtype: str
790     @return: the path to the job file
791
792     """
793     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
794
795   @classmethod
796   def _GetArchivedJobPath(cls, job_id):
797     """Returns the archived job file for a give job id.
798
799     @type job_id: str
800     @param job_id: the job identifier
801     @rtype: str
802     @return: the path to the archived job file
803
804     """
805     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
806     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
807
808   @classmethod
809   def _ExtractJobID(cls, name):
810     """Extract the job id from a filename.
811
812     @type name: str
813     @param name: the job filename
814     @rtype: job id or None
815     @return: the job id corresponding to the given filename,
816         or None if the filename does not represent a valid
817         job file
818
819     """
820     m = cls._RE_JOB_FILE.match(name)
821     if m:
822       return m.group(1)
823     else:
824       return None
825
826   def _GetJobIDsUnlocked(self, archived=False):
827     """Return all known job IDs.
828
829     If the parameter archived is True, archived jobs IDs will be
830     included. Currently this argument is unused.
831
832     The method only looks at disk because it's a requirement that all
833     jobs are present on disk (so in the _memcache we don't have any
834     extra IDs).
835
836     @rtype: list
837     @return: the list of job IDs
838
839     """
840     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
841     jlist = utils.NiceSort(jlist)
842     return jlist
843
844   def _ListJobFiles(self):
845     """Returns the list of current job files.
846
847     @rtype: list
848     @return: the list of job file names
849
850     """
851     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
852             if self._RE_JOB_FILE.match(name)]
853
854   def _LoadJobUnlocked(self, job_id):
855     """Loads a job from the disk or memory.
856
857     Given a job id, this will return the cached job object if
858     existing, or try to load the job from the disk. If loading from
859     disk, it will also add the job to the cache.
860
861     @param job_id: the job id
862     @rtype: L{_QueuedJob} or None
863     @return: either None or the job object
864
865     """
866     job = self._memcache.get(job_id, None)
867     if job:
868       logging.debug("Found job %s in memcache", job_id)
869       return job
870
871     filepath = self._GetJobPath(job_id)
872     logging.debug("Loading job from %s", filepath)
873     try:
874       fd = open(filepath, "r")
875     except IOError, err:
876       if err.errno in (errno.ENOENT, ):
877         return None
878       raise
879     try:
880       data = serializer.LoadJson(fd.read())
881     finally:
882       fd.close()
883
884     try:
885       job = _QueuedJob.Restore(self, data)
886     except Exception, err:
887       new_path = self._GetArchivedJobPath(job_id)
888       if filepath == new_path:
889         # job already archived (future case)
890         logging.exception("Can't parse job %s", job_id)
891       else:
892         # non-archived case
893         logging.exception("Can't parse job %s, will archive.", job_id)
894         self._RenameFilesUnlocked([(filepath, new_path)])
895       return None
896
897     self._memcache[job_id] = job
898     logging.debug("Added job %s to the cache", job_id)
899     return job
900
901   def _GetJobsUnlocked(self, job_ids):
902     """Return a list of jobs based on their IDs.
903
904     @type job_ids: list
905     @param job_ids: either an empty list (meaning all jobs),
906         or a list of job IDs
907     @rtype: list
908     @return: the list of job objects
909
910     """
911     if not job_ids:
912       job_ids = self._GetJobIDsUnlocked()
913
914     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
915
916   @staticmethod
917   def _IsQueueMarkedDrain():
918     """Check if the queue is marked from drain.
919
920     This currently uses the queue drain file, which makes it a
921     per-node flag. In the future this can be moved to the config file.
922
923     @rtype: boolean
924     @return: True of the job queue is marked for draining
925
926     """
927     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
928
929   @staticmethod
930   def SetDrainFlag(drain_flag):
931     """Sets the drain flag for the queue.
932
933     This is similar to the function L{backend.JobQueueSetDrainFlag},
934     and in the future we might merge them.
935
936     @type drain_flag: boolean
937     @param drain_flag: Whether to set or unset the drain flag
938
939     """
940     if drain_flag:
941       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
942     else:
943       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
944     return True
945
946   @utils.LockedMethod
947   @_RequireOpenQueue
948   def SubmitJob(self, ops):
949     """Create and store a new job.
950
951     This enters the job into our job queue and also puts it on the new
952     queue, in order for it to be picked up by the queue processors.
953
954     @type ops: list
955     @param ops: The list of OpCodes that will become the new job.
956     @rtype: job ID
957     @return: the job ID of the newly created job
958     @raise errors.JobQueueDrainError: if the job is marked for draining
959
960     """
961     if self._IsQueueMarkedDrain():
962       raise errors.JobQueueDrainError()
963
964     # Check job queue size
965     size = len(self._ListJobFiles())
966     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
967       # TODO: Autoarchive jobs. Make sure it's not done on every job
968       # submission, though.
969       #size = ...
970       pass
971
972     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
973       raise errors.JobQueueFull()
974
975     # Get job identifier
976     job_id = self._NewSerialUnlocked()
977     job = _QueuedJob(self, job_id, ops)
978
979     # Write to disk
980     self.UpdateJobUnlocked(job)
981
982     logging.debug("Adding new job %s to the cache", job_id)
983     self._memcache[job_id] = job
984
985     # Add to worker pool
986     self._wpool.AddTask(job)
987
988     return job.id
989
990   @_RequireOpenQueue
991   def UpdateJobUnlocked(self, job):
992     """Update a job's on disk storage.
993
994     After a job has been modified, this function needs to be called in
995     order to write the changes to disk and replicate them to the other
996     nodes.
997
998     @type job: L{_QueuedJob}
999     @param job: the changed job
1000
1001     """
1002     filename = self._GetJobPath(job.id)
1003     data = serializer.DumpJson(job.Serialize(), indent=False)
1004     logging.debug("Writing job %s to %s", job.id, filename)
1005     self._WriteAndReplicateFileUnlocked(filename, data)
1006
1007     # Notify waiters about potential changes
1008     job.change.notifyAll()
1009
1010   @utils.LockedMethod
1011   @_RequireOpenQueue
1012   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1013                         timeout):
1014     """Waits for changes in a job.
1015
1016     @type job_id: string
1017     @param job_id: Job identifier
1018     @type fields: list of strings
1019     @param fields: Which fields to check for changes
1020     @type prev_job_info: list or None
1021     @param prev_job_info: Last job information returned
1022     @type prev_log_serial: int
1023     @param prev_log_serial: Last job message serial number
1024     @type timeout: float
1025     @param timeout: maximum time to wait
1026     @rtype: tuple (job info, log entries)
1027     @return: a tuple of the job information as required via
1028         the fields parameter, and the log entries as a list
1029
1030         if the job has not changed and the timeout has expired,
1031         we instead return a special value,
1032         L{constants.JOB_NOTCHANGED}, which should be interpreted
1033         as such by the clients
1034
1035     """
1036     logging.debug("Waiting for changes in job %s", job_id)
1037     end_time = time.time() + timeout
1038     while True:
1039       delta_time = end_time - time.time()
1040       if delta_time < 0:
1041         return constants.JOB_NOTCHANGED
1042
1043       job = self._LoadJobUnlocked(job_id)
1044       if not job:
1045         logging.debug("Job %s not found", job_id)
1046         break
1047
1048       status = job.CalcStatus()
1049       job_info = self._GetJobInfoUnlocked(job, fields)
1050       log_entries = job.GetLogEntries(prev_log_serial)
1051
1052       # Serializing and deserializing data can cause type changes (e.g. from
1053       # tuple to list) or precision loss. We're doing it here so that we get
1054       # the same modifications as the data received from the client. Without
1055       # this, the comparison afterwards might fail without the data being
1056       # significantly different.
1057       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1058       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1059
1060       if status not in (constants.JOB_STATUS_QUEUED,
1061                         constants.JOB_STATUS_RUNNING,
1062                         constants.JOB_STATUS_WAITLOCK):
1063         # Don't even try to wait if the job is no longer running, there will be
1064         # no changes.
1065         break
1066
1067       if (prev_job_info != job_info or
1068           (log_entries and prev_log_serial != log_entries[0][0])):
1069         break
1070
1071       logging.debug("Waiting again")
1072
1073       # Release the queue lock while waiting
1074       job.change.wait(delta_time)
1075
1076     logging.debug("Job %s changed", job_id)
1077
1078     return (job_info, log_entries)
1079
1080   @utils.LockedMethod
1081   @_RequireOpenQueue
1082   def CancelJob(self, job_id):
1083     """Cancels a job.
1084
1085     This will only succeed if the job has not started yet.
1086
1087     @type job_id: string
1088     @param job_id: job ID of job to be cancelled.
1089
1090     """
1091     logging.info("Cancelling job %s", job_id)
1092
1093     job = self._LoadJobUnlocked(job_id)
1094     if not job:
1095       logging.debug("Job %s not found", job_id)
1096       return (False, "Job %s not found" % job_id)
1097
1098     job_status = job.CalcStatus()
1099
1100     if job_status not in (constants.JOB_STATUS_QUEUED,
1101                           constants.JOB_STATUS_WAITLOCK):
1102       logging.debug("Job %s is no longer in the queue", job.id)
1103       return (False, "Job %s is no longer in the queue" % job.id)
1104
1105     if job_status == constants.JOB_STATUS_QUEUED:
1106       self.CancelJobUnlocked(job)
1107       return (True, "Job %s canceled" % job.id)
1108
1109     elif job_status == constants.JOB_STATUS_WAITLOCK:
1110       # The worker will notice the new status and cancel the job
1111       try:
1112         for op in job.ops:
1113           op.status = constants.OP_STATUS_CANCELING
1114       finally:
1115         self.UpdateJobUnlocked(job)
1116       return (True, "Job %s will be canceled" % job.id)
1117
1118   @_RequireOpenQueue
1119   def CancelJobUnlocked(self, job):
1120     """Marks a job as canceled.
1121
1122     """
1123     try:
1124       for op in job.ops:
1125         op.status = constants.OP_STATUS_CANCELED
1126         op.result = "Job canceled by request"
1127     finally:
1128       self.UpdateJobUnlocked(job)
1129
1130   @_RequireOpenQueue
1131   def _ArchiveJobsUnlocked(self, jobs):
1132     """Archives jobs.
1133
1134     @type jobs: list of L{_QueuedJob}
1135     @param jobs: Job objects
1136     @rtype: int
1137     @return: Number of archived jobs
1138
1139     """
1140     archive_jobs = []
1141     rename_files = []
1142     for job in jobs:
1143       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1144                                   constants.JOB_STATUS_SUCCESS,
1145                                   constants.JOB_STATUS_ERROR):
1146         logging.debug("Job %s is not yet done", job.id)
1147         continue
1148
1149       archive_jobs.append(job)
1150
1151       old = self._GetJobPath(job.id)
1152       new = self._GetArchivedJobPath(job.id)
1153       rename_files.append((old, new))
1154
1155     # TODO: What if 1..n files fail to rename?
1156     self._RenameFilesUnlocked(rename_files)
1157
1158     logging.debug("Successfully archived job(s) %s",
1159                   ", ".join(job.id for job in archive_jobs))
1160
1161     return len(archive_jobs)
1162
1163   @utils.LockedMethod
1164   @_RequireOpenQueue
1165   def ArchiveJob(self, job_id):
1166     """Archives a job.
1167
1168     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1169
1170     @type job_id: string
1171     @param job_id: Job ID of job to be archived.
1172     @rtype: bool
1173     @return: Whether job was archived
1174
1175     """
1176     logging.info("Archiving job %s", job_id)
1177
1178     job = self._LoadJobUnlocked(job_id)
1179     if not job:
1180       logging.debug("Job %s not found", job_id)
1181       return False
1182
1183     return self._ArchiveJobsUnlocked([job]) == 1
1184
1185   @utils.LockedMethod
1186   @_RequireOpenQueue
1187   def AutoArchiveJobs(self, age, timeout):
1188     """Archives all jobs based on age.
1189
1190     The method will archive all jobs which are older than the age
1191     parameter. For jobs that don't have an end timestamp, the start
1192     timestamp will be considered. The special '-1' age will cause
1193     archival of all jobs (that are not running or queued).
1194
1195     @type age: int
1196     @param age: the minimum age in seconds
1197
1198     """
1199     logging.info("Archiving jobs with age more than %s seconds", age)
1200
1201     now = time.time()
1202     end_time = now + timeout
1203     archived_count = 0
1204     last_touched = 0
1205
1206     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1207     pending = []
1208     for idx, job_id in enumerate(all_job_ids):
1209       last_touched = idx
1210
1211       # Not optimal because jobs could be pending
1212       # TODO: Measure average duration for job archival and take number of
1213       # pending jobs into account.
1214       if time.time() > end_time:
1215         break
1216
1217       # Returns None if the job failed to load
1218       job = self._LoadJobUnlocked(job_id)
1219       if job:
1220         if job.end_timestamp is None:
1221           if job.start_timestamp is None:
1222             job_age = job.received_timestamp
1223           else:
1224             job_age = job.start_timestamp
1225         else:
1226           job_age = job.end_timestamp
1227
1228         if age == -1 or now - job_age[0] > age:
1229           pending.append(job)
1230
1231           # Archive 10 jobs at a time
1232           if len(pending) >= 10:
1233             archived_count += self._ArchiveJobsUnlocked(pending)
1234             pending = []
1235
1236     if pending:
1237       archived_count += self._ArchiveJobsUnlocked(pending)
1238
1239     return (archived_count, len(all_job_ids) - last_touched - 1)
1240
1241   def _GetJobInfoUnlocked(self, job, fields):
1242     """Returns information about a job.
1243
1244     @type job: L{_QueuedJob}
1245     @param job: the job which we query
1246     @type fields: list
1247     @param fields: names of fields to return
1248     @rtype: list
1249     @return: list with one element for each field
1250     @raise errors.OpExecError: when an invalid field
1251         has been passed
1252
1253     """
1254     row = []
1255     for fname in fields:
1256       if fname == "id":
1257         row.append(job.id)
1258       elif fname == "status":
1259         row.append(job.CalcStatus())
1260       elif fname == "ops":
1261         row.append([op.input.__getstate__() for op in job.ops])
1262       elif fname == "opresult":
1263         row.append([op.result for op in job.ops])
1264       elif fname == "opstatus":
1265         row.append([op.status for op in job.ops])
1266       elif fname == "oplog":
1267         row.append([op.log for op in job.ops])
1268       elif fname == "opstart":
1269         row.append([op.start_timestamp for op in job.ops])
1270       elif fname == "opend":
1271         row.append([op.end_timestamp for op in job.ops])
1272       elif fname == "received_ts":
1273         row.append(job.received_timestamp)
1274       elif fname == "start_ts":
1275         row.append(job.start_timestamp)
1276       elif fname == "end_ts":
1277         row.append(job.end_timestamp)
1278       elif fname == "summary":
1279         row.append([op.input.Summary() for op in job.ops])
1280       else:
1281         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1282     return row
1283
1284   @utils.LockedMethod
1285   @_RequireOpenQueue
1286   def QueryJobs(self, job_ids, fields):
1287     """Returns a list of jobs in queue.
1288
1289     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1290     processing for each job.
1291
1292     @type job_ids: list
1293     @param job_ids: sequence of job identifiers or None for all
1294     @type fields: list
1295     @param fields: names of fields to return
1296     @rtype: list
1297     @return: list one element per job, each element being list with
1298         the requested fields
1299
1300     """
1301     jobs = []
1302
1303     for job in self._GetJobsUnlocked(job_ids):
1304       if job is None:
1305         jobs.append(None)
1306       else:
1307         jobs.append(self._GetJobInfoUnlocked(job, fields))
1308
1309     return jobs
1310
1311   @utils.LockedMethod
1312   @_RequireOpenQueue
1313   def Shutdown(self):
1314     """Stops the job queue.
1315
1316     This shutdowns all the worker threads an closes the queue.
1317
1318     """
1319     self._wpool.TerminateWorkers()
1320
1321     self._queue_lock.Close()
1322     self._queue_lock = None