Convert snapshot_export rpc to new style
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encasulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   def __init__(self, op):
84     """Constructor for the _QuededOpCode.
85
86     @type op: L{opcodes.OpCode}
87     @param op: the opcode we encapsulate
88
89     """
90     self.input = op
91     self.status = constants.OP_STATUS_QUEUED
92     self.result = None
93     self.log = []
94     self.start_timestamp = None
95     self.end_timestamp = None
96
97   @classmethod
98   def Restore(cls, state):
99     """Restore the _QueuedOpCode from the serialized form.
100
101     @type state: dict
102     @param state: the serialized state
103     @rtype: _QueuedOpCode
104     @return: a new _QueuedOpCode instance
105
106     """
107     obj = _QueuedOpCode.__new__(cls)
108     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109     obj.status = state["status"]
110     obj.result = state["result"]
111     obj.log = state["log"]
112     obj.start_timestamp = state.get("start_timestamp", None)
113     obj.end_timestamp = state.get("end_timestamp", None)
114     return obj
115
116   def Serialize(self):
117     """Serializes this _QueuedOpCode.
118
119     @rtype: dict
120     @return: the dictionary holding the serialized state
121
122     """
123     return {
124       "input": self.input.__getstate__(),
125       "status": self.status,
126       "result": self.result,
127       "log": self.log,
128       "start_timestamp": self.start_timestamp,
129       "end_timestamp": self.end_timestamp,
130       }
131
132
133 class _QueuedJob(object):
134   """In-memory job representation.
135
136   This is what we use to track the user-submitted jobs. Locking must
137   be taken care of by users of this class.
138
139   @type queue: L{JobQueue}
140   @ivar queue: the parent queue
141   @ivar id: the job ID
142   @type ops: list
143   @ivar ops: the list of _QueuedOpCode that constitute the job
144   @type run_op_index: int
145   @ivar run_op_index: the currently executing opcode, or -1 if
146       we didn't yet start executing
147   @type log_serial: int
148   @ivar log_serial: holds the index for the next log entry
149   @ivar received_timestamp: the timestamp for when the job was received
150   @ivar start_timestmap: the timestamp for start of execution
151   @ivar end_timestamp: the timestamp for end of execution
152   @ivar change: a Condition variable we use for waiting for job changes
153
154   """
155   def __init__(self, queue, job_id, ops):
156     """Constructor for the _QueuedJob.
157
158     @type queue: L{JobQueue}
159     @param queue: our parent queue
160     @type job_id: job_id
161     @param job_id: our job id
162     @type ops: list
163     @param ops: the list of opcodes we hold, which will be encapsulated
164         in _QueuedOpCodes
165
166     """
167     if not ops:
168       # TODO: use a better exception
169       raise Exception("No opcodes")
170
171     self.queue = queue
172     self.id = job_id
173     self.ops = [_QueuedOpCode(op) for op in ops]
174     self.run_op_index = -1
175     self.log_serial = 0
176     self.received_timestamp = TimeStampNow()
177     self.start_timestamp = None
178     self.end_timestamp = None
179
180     # Condition to wait for changes
181     self.change = threading.Condition(self.queue._lock)
182
183   @classmethod
184   def Restore(cls, queue, state):
185     """Restore a _QueuedJob from serialized state:
186
187     @type queue: L{JobQueue}
188     @param queue: to which queue the restored job belongs
189     @type state: dict
190     @param state: the serialized state
191     @rtype: _JobQueue
192     @return: the restored _JobQueue instance
193
194     """
195     obj = _QueuedJob.__new__(cls)
196     obj.queue = queue
197     obj.id = state["id"]
198     obj.run_op_index = state["run_op_index"]
199     obj.received_timestamp = state.get("received_timestamp", None)
200     obj.start_timestamp = state.get("start_timestamp", None)
201     obj.end_timestamp = state.get("end_timestamp", None)
202
203     obj.ops = []
204     obj.log_serial = 0
205     for op_state in state["ops"]:
206       op = _QueuedOpCode.Restore(op_state)
207       for log_entry in op.log:
208         obj.log_serial = max(obj.log_serial, log_entry[0])
209       obj.ops.append(op)
210
211     # Condition to wait for changes
212     obj.change = threading.Condition(obj.queue._lock)
213
214     return obj
215
216   def Serialize(self):
217     """Serialize the _JobQueue instance.
218
219     @rtype: dict
220     @return: the serialized state
221
222     """
223     return {
224       "id": self.id,
225       "ops": [op.Serialize() for op in self.ops],
226       "run_op_index": self.run_op_index,
227       "start_timestamp": self.start_timestamp,
228       "end_timestamp": self.end_timestamp,
229       "received_timestamp": self.received_timestamp,
230       }
231
232   def CalcStatus(self):
233     """Compute the status of this job.
234
235     This function iterates over all the _QueuedOpCodes in the job and
236     based on their status, computes the job status.
237
238     The algorithm is:
239       - if we find a cancelled, or finished with error, the job
240         status will be the same
241       - otherwise, the last opcode with the status one of:
242           - waitlock
243           - canceling
244           - running
245
246         will determine the job status
247
248       - otherwise, it means either all opcodes are queued, or success,
249         and the job status will be the same
250
251     @return: the job status
252
253     """
254     status = constants.JOB_STATUS_QUEUED
255
256     all_success = True
257     for op in self.ops:
258       if op.status == constants.OP_STATUS_SUCCESS:
259         continue
260
261       all_success = False
262
263       if op.status == constants.OP_STATUS_QUEUED:
264         pass
265       elif op.status == constants.OP_STATUS_WAITLOCK:
266         status = constants.JOB_STATUS_WAITLOCK
267       elif op.status == constants.OP_STATUS_RUNNING:
268         status = constants.JOB_STATUS_RUNNING
269       elif op.status == constants.OP_STATUS_CANCELING:
270         status = constants.JOB_STATUS_CANCELING
271         break
272       elif op.status == constants.OP_STATUS_ERROR:
273         status = constants.JOB_STATUS_ERROR
274         # The whole job fails if one opcode failed
275         break
276       elif op.status == constants.OP_STATUS_CANCELED:
277         status = constants.OP_STATUS_CANCELED
278         break
279
280     if all_success:
281       status = constants.JOB_STATUS_SUCCESS
282
283     return status
284
285   def GetLogEntries(self, newer_than):
286     """Selectively returns the log entries.
287
288     @type newer_than: None or int
289     @param newer_than: if this is None, return all log enties,
290         otherwise return only the log entries with serial higher
291         than this value
292     @rtype: list
293     @return: the list of the log entries selected
294
295     """
296     if newer_than is None:
297       serial = -1
298     else:
299       serial = newer_than
300
301     entries = []
302     for op in self.ops:
303       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304
305     return entries
306
307
308 class _JobQueueWorker(workerpool.BaseWorker):
309   """The actual job workers.
310
311   """
312   def _NotifyStart(self):
313     """Mark the opcode as running, not lock-waiting.
314
315     This is called from the mcpu code as a notifier function, when the
316     LU is finally about to start the Exec() method. Of course, to have
317     end-user visible results, the opcode must be initially (before
318     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
319
320     """
321     assert self.queue, "Queue attribute is missing"
322     assert self.opcode, "Opcode attribute is missing"
323
324     self.queue.acquire()
325     try:
326       assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327                                     constants.OP_STATUS_CANCELING)
328
329       # Cancel here if we were asked to
330       if self.opcode.status == constants.OP_STATUS_CANCELING:
331         raise CancelJob()
332
333       self.opcode.status = constants.OP_STATUS_RUNNING
334     finally:
335       self.queue.release()
336
337   def RunTask(self, job):
338     """Job executor.
339
340     This functions processes a job. It is closely tied to the _QueuedJob and
341     _QueuedOpCode classes.
342
343     @type job: L{_QueuedJob}
344     @param job: the job to be processed
345
346     """
347     logging.info("Worker %s processing job %s",
348                   self.worker_id, job.id)
349     proc = mcpu.Processor(self.pool.queue.context)
350     self.queue = queue = job.queue
351     try:
352       try:
353         count = len(job.ops)
354         for idx, op in enumerate(job.ops):
355           op_summary = op.input.Summary()
356           try:
357             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
358                          op_summary)
359
360             queue.acquire()
361             try:
362               if op.status == constants.OP_STATUS_CANCELED:
363                 raise CancelJob()
364               assert op.status == constants.OP_STATUS_QUEUED
365               job.run_op_index = idx
366               op.status = constants.OP_STATUS_WAITLOCK
367               op.result = None
368               op.start_timestamp = TimeStampNow()
369               if idx == 0: # first opcode
370                 job.start_timestamp = op.start_timestamp
371               queue.UpdateJobUnlocked(job)
372
373               input_opcode = op.input
374             finally:
375               queue.release()
376
377             def _Log(*args):
378               """Append a log entry.
379
380               """
381               assert len(args) < 3
382
383               if len(args) == 1:
384                 log_type = constants.ELOG_MESSAGE
385                 log_msg = args[0]
386               else:
387                 log_type, log_msg = args
388
389               # The time is split to make serialization easier and not lose
390               # precision.
391               timestamp = utils.SplitTime(time.time())
392
393               queue.acquire()
394               try:
395                 job.log_serial += 1
396                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
397
398                 job.change.notifyAll()
399               finally:
400                 queue.release()
401
402             # Make sure not to hold lock while _Log is called
403             self.opcode = op
404             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
405
406             queue.acquire()
407             try:
408               op.status = constants.OP_STATUS_SUCCESS
409               op.result = result
410               op.end_timestamp = TimeStampNow()
411               queue.UpdateJobUnlocked(job)
412             finally:
413               queue.release()
414
415             logging.info("Op %s/%s: Successfully finished opcode %s",
416                          idx + 1, count, op_summary)
417           except CancelJob:
418             # Will be handled further up
419             raise
420           except Exception, err:
421             queue.acquire()
422             try:
423               try:
424                 op.status = constants.OP_STATUS_ERROR
425                 op.result = str(err)
426                 op.end_timestamp = TimeStampNow()
427                 logging.info("Op %s/%s: Error in opcode %s: %s",
428                              idx + 1, count, op_summary, err)
429               finally:
430                 queue.UpdateJobUnlocked(job)
431             finally:
432               queue.release()
433             raise
434
435       except CancelJob:
436         queue.acquire()
437         try:
438           queue.CancelJobUnlocked(job)
439         finally:
440           queue.release()
441       except errors.GenericError, err:
442         logging.exception("Ganeti exception")
443       except:
444         logging.exception("Unhandled exception")
445     finally:
446       queue.acquire()
447       try:
448         try:
449           job.run_op_idx = -1
450           job.end_timestamp = TimeStampNow()
451           queue.UpdateJobUnlocked(job)
452         finally:
453           job_id = job.id
454           status = job.CalcStatus()
455       finally:
456         queue.release()
457       logging.info("Worker %s finished job %s, status = %s",
458                    self.worker_id, job_id, status)
459
460
461 class _JobQueueWorkerPool(workerpool.WorkerPool):
462   """Simple class implementing a job-processing workerpool.
463
464   """
465   def __init__(self, queue):
466     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
467                                               _JobQueueWorker)
468     self.queue = queue
469
470
471 class JobQueue(object):
472   """Quue used to manaage the jobs.
473
474   @cvar _RE_JOB_FILE: regex matching the valid job file names
475
476   """
477   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
478
479   def _RequireOpenQueue(fn):
480     """Decorator for "public" functions.
481
482     This function should be used for all 'public' functions. That is,
483     functions usually called from other classes.
484
485     @warning: Use this decorator only after utils.LockedMethod!
486
487     Example::
488       @utils.LockedMethod
489       @_RequireOpenQueue
490       def Example(self):
491         pass
492
493     """
494     def wrapper(self, *args, **kwargs):
495       assert self._queue_lock is not None, "Queue should be open"
496       return fn(self, *args, **kwargs)
497     return wrapper
498
499   def __init__(self, context):
500     """Constructor for JobQueue.
501
502     The constructor will initialize the job queue object and then
503     start loading the current jobs from disk, either for starting them
504     (if they were queue) or for aborting them (if they were already
505     running).
506
507     @type context: GanetiContext
508     @param context: the context object for access to the configuration
509         data and other ganeti objects
510
511     """
512     self.context = context
513     self._memcache = weakref.WeakValueDictionary()
514     self._my_hostname = utils.HostInfo().name
515
516     # Locking
517     self._lock = threading.Lock()
518     self.acquire = self._lock.acquire
519     self.release = self._lock.release
520
521     # Initialize
522     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
523
524     # Read serial file
525     self._last_serial = jstore.ReadSerial()
526     assert self._last_serial is not None, ("Serial file was modified between"
527                                            " check in jstore and here")
528
529     # Get initial list of nodes
530     self._nodes = dict((n.name, n.primary_ip)
531                        for n in self.context.cfg.GetAllNodesInfo().values()
532                        if n.master_candidate)
533
534     # Remove master node
535     try:
536       del self._nodes[self._my_hostname]
537     except KeyError:
538       pass
539
540     # TODO: Check consistency across nodes
541
542     # Setup worker pool
543     self._wpool = _JobQueueWorkerPool(self)
544     try:
545       # We need to lock here because WorkerPool.AddTask() may start a job while
546       # we're still doing our work.
547       self.acquire()
548       try:
549         logging.info("Inspecting job queue")
550
551         all_job_ids = self._GetJobIDsUnlocked()
552         jobs_count = len(all_job_ids)
553         lastinfo = time.time()
554         for idx, job_id in enumerate(all_job_ids):
555           # Give an update every 1000 jobs or 10 seconds
556           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
557               idx == (jobs_count - 1)):
558             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
559                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
560             lastinfo = time.time()
561
562           job = self._LoadJobUnlocked(job_id)
563
564           # a failure in loading the job can cause 'None' to be returned
565           if job is None:
566             continue
567
568           status = job.CalcStatus()
569
570           if status in (constants.JOB_STATUS_QUEUED, ):
571             self._wpool.AddTask(job)
572
573           elif status in (constants.JOB_STATUS_RUNNING,
574                           constants.JOB_STATUS_WAITLOCK,
575                           constants.JOB_STATUS_CANCELING):
576             logging.warning("Unfinished job %s found: %s", job.id, job)
577             try:
578               for op in job.ops:
579                 op.status = constants.OP_STATUS_ERROR
580                 op.result = "Unclean master daemon shutdown"
581             finally:
582               self.UpdateJobUnlocked(job)
583
584         logging.info("Job queue inspection finished")
585       finally:
586         self.release()
587     except:
588       self._wpool.TerminateWorkers()
589       raise
590
591   @utils.LockedMethod
592   @_RequireOpenQueue
593   def AddNode(self, node):
594     """Register a new node with the queue.
595
596     @type node: L{objects.Node}
597     @param node: the node object to be added
598
599     """
600     node_name = node.name
601     assert node_name != self._my_hostname
602
603     # Clean queue directory on added node
604     rpc.RpcRunner.call_jobqueue_purge(node_name)
605
606     if not node.master_candidate:
607       # remove if existing, ignoring errors
608       self._nodes.pop(node_name, None)
609       # and skip the replication of the job ids
610       return
611
612     # Upload the whole queue excluding archived jobs
613     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
614
615     # Upload current serial file
616     files.append(constants.JOB_QUEUE_SERIAL_FILE)
617
618     for file_name in files:
619       # Read file content
620       fd = open(file_name, "r")
621       try:
622         content = fd.read()
623       finally:
624         fd.close()
625
626       result = rpc.RpcRunner.call_jobqueue_update([node_name],
627                                                   [node.primary_ip],
628                                                   file_name, content)
629       if not result[node_name]:
630         logging.error("Failed to upload %s to %s", file_name, node_name)
631
632     self._nodes[node_name] = node.primary_ip
633
634   @utils.LockedMethod
635   @_RequireOpenQueue
636   def RemoveNode(self, node_name):
637     """Callback called when removing nodes from the cluster.
638
639     @type node_name: str
640     @param node_name: the name of the node to remove
641
642     """
643     try:
644       # The queue is removed by the "leave node" RPC call.
645       del self._nodes[node_name]
646     except KeyError:
647       pass
648
649   def _CheckRpcResult(self, result, nodes, failmsg):
650     """Verifies the status of an RPC call.
651
652     Since we aim to keep consistency should this node (the current
653     master) fail, we will log errors if our rpc fail, and especially
654     log the case when more than half of the nodes failes.
655
656     @param result: the data as returned from the rpc call
657     @type nodes: list
658     @param nodes: the list of nodes we made the call to
659     @type failmsg: str
660     @param failmsg: the identifier to be used for logging
661
662     """
663     failed = []
664     success = []
665
666     for node in nodes:
667       if result[node]:
668         success.append(node)
669       else:
670         failed.append(node)
671
672     if failed:
673       logging.error("%s failed on %s", failmsg, ", ".join(failed))
674
675     # +1 for the master node
676     if (len(success) + 1) < len(failed):
677       # TODO: Handle failing nodes
678       logging.error("More than half of the nodes failed")
679
680   def _GetNodeIp(self):
681     """Helper for returning the node name/ip list.
682
683     @rtype: (list, list)
684     @return: a tuple of two lists, the first one with the node
685         names and the second one with the node addresses
686
687     """
688     name_list = self._nodes.keys()
689     addr_list = [self._nodes[name] for name in name_list]
690     return name_list, addr_list
691
692   def _WriteAndReplicateFileUnlocked(self, file_name, data):
693     """Writes a file locally and then replicates it to all nodes.
694
695     This function will replace the contents of a file on the local
696     node and then replicate it to all the other nodes we have.
697
698     @type file_name: str
699     @param file_name: the path of the file to be replicated
700     @type data: str
701     @param data: the new contents of the file
702
703     """
704     utils.WriteFile(file_name, data=data)
705
706     names, addrs = self._GetNodeIp()
707     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
708     self._CheckRpcResult(result, self._nodes,
709                          "Updating %s" % file_name)
710
711   def _RenameFilesUnlocked(self, rename):
712     """Renames a file locally and then replicate the change.
713
714     This function will rename a file in the local queue directory
715     and then replicate this rename to all the other nodes we have.
716
717     @type rename: list of (old, new)
718     @param rename: List containing tuples mapping old to new names
719
720     """
721     # Rename them locally
722     for old, new in rename:
723       utils.RenameFile(old, new, mkdir=True)
724
725     # ... and on all nodes
726     names, addrs = self._GetNodeIp()
727     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
728     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
729
730   def _FormatJobID(self, job_id):
731     """Convert a job ID to string format.
732
733     Currently this just does C{str(job_id)} after performing some
734     checks, but if we want to change the job id format this will
735     abstract this change.
736
737     @type job_id: int or long
738     @param job_id: the numeric job id
739     @rtype: str
740     @return: the formatted job id
741
742     """
743     if not isinstance(job_id, (int, long)):
744       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
745     if job_id < 0:
746       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
747
748     return str(job_id)
749
750   @classmethod
751   def _GetArchiveDirectory(cls, job_id):
752     """Returns the archive directory for a job.
753
754     @type job_id: str
755     @param job_id: Job identifier
756     @rtype: str
757     @return: Directory name
758
759     """
760     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
761
762   def _NewSerialUnlocked(self):
763     """Generates a new job identifier.
764
765     Job identifiers are unique during the lifetime of a cluster.
766
767     @rtype: str
768     @return: a string representing the job identifier.
769
770     """
771     # New number
772     serial = self._last_serial + 1
773
774     # Write to file
775     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
776                                         "%s\n" % serial)
777
778     # Keep it only if we were able to write the file
779     self._last_serial = serial
780
781     return self._FormatJobID(serial)
782
783   @staticmethod
784   def _GetJobPath(job_id):
785     """Returns the job file for a given job id.
786
787     @type job_id: str
788     @param job_id: the job identifier
789     @rtype: str
790     @return: the path to the job file
791
792     """
793     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
794
795   @classmethod
796   def _GetArchivedJobPath(cls, job_id):
797     """Returns the archived job file for a give job id.
798
799     @type job_id: str
800     @param job_id: the job identifier
801     @rtype: str
802     @return: the path to the archived job file
803
804     """
805     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
806     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
807
808   @classmethod
809   def _ExtractJobID(cls, name):
810     """Extract the job id from a filename.
811
812     @type name: str
813     @param name: the job filename
814     @rtype: job id or None
815     @return: the job id corresponding to the given filename,
816         or None if the filename does not represent a valid
817         job file
818
819     """
820     m = cls._RE_JOB_FILE.match(name)
821     if m:
822       return m.group(1)
823     else:
824       return None
825
826   def _GetJobIDsUnlocked(self, archived=False):
827     """Return all known job IDs.
828
829     If the parameter archived is True, archived jobs IDs will be
830     included. Currently this argument is unused.
831
832     The method only looks at disk because it's a requirement that all
833     jobs are present on disk (so in the _memcache we don't have any
834     extra IDs).
835
836     @rtype: list
837     @return: the list of job IDs
838
839     """
840     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
841     jlist = utils.NiceSort(jlist)
842     return jlist
843
844   def _ListJobFiles(self):
845     """Returns the list of current job files.
846
847     @rtype: list
848     @return: the list of job file names
849
850     """
851     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
852             if self._RE_JOB_FILE.match(name)]
853
854   def _LoadJobUnlocked(self, job_id):
855     """Loads a job from the disk or memory.
856
857     Given a job id, this will return the cached job object if
858     existing, or try to load the job from the disk. If loading from
859     disk, it will also add the job to the cache.
860
861     @param job_id: the job id
862     @rtype: L{_QueuedJob} or None
863     @return: either None or the job object
864
865     """
866     job = self._memcache.get(job_id, None)
867     if job:
868       logging.debug("Found job %s in memcache", job_id)
869       return job
870
871     filepath = self._GetJobPath(job_id)
872     logging.debug("Loading job from %s", filepath)
873     try:
874       fd = open(filepath, "r")
875     except IOError, err:
876       if err.errno in (errno.ENOENT, ):
877         return None
878       raise
879     try:
880       data = serializer.LoadJson(fd.read())
881     finally:
882       fd.close()
883
884     try:
885       job = _QueuedJob.Restore(self, data)
886     except Exception, err:
887       new_path = self._GetArchivedJobPath(job_id)
888       if filepath == new_path:
889         # job already archived (future case)
890         logging.exception("Can't parse job %s", job_id)
891       else:
892         # non-archived case
893         logging.exception("Can't parse job %s, will archive.", job_id)
894         self._RenameFilesUnlocked([(filepath, new_path)])
895       return None
896
897     self._memcache[job_id] = job
898     logging.debug("Added job %s to the cache", job_id)
899     return job
900
901   def _GetJobsUnlocked(self, job_ids):
902     """Return a list of jobs based on their IDs.
903
904     @type job_ids: list
905     @param job_ids: either an empty list (meaning all jobs),
906         or a list of job IDs
907     @rtype: list
908     @return: the list of job objects
909
910     """
911     if not job_ids:
912       job_ids = self._GetJobIDsUnlocked()
913
914     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
915
916   @staticmethod
917   def _IsQueueMarkedDrain():
918     """Check if the queue is marked from drain.
919
920     This currently uses the queue drain file, which makes it a
921     per-node flag. In the future this can be moved to the config file.
922
923     @rtype: boolean
924     @return: True of the job queue is marked for draining
925
926     """
927     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
928
929   @staticmethod
930   def SetDrainFlag(drain_flag):
931     """Sets the drain flag for the queue.
932
933     This is similar to the function L{backend.JobQueueSetDrainFlag},
934     and in the future we might merge them.
935
936     @type drain_flag: boolean
937     @param drain_flag: wheter to set or unset the drain flag
938
939     """
940     if drain_flag:
941       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
942     else:
943       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
944     return True
945
946   @_RequireOpenQueue
947   def _SubmitJobUnlocked(self, ops):
948     """Create and store a new job.
949
950     This enters the job into our job queue and also puts it on the new
951     queue, in order for it to be picked up by the queue processors.
952
953     @type ops: list
954     @param ops: The list of OpCodes that will become the new job.
955     @rtype: job ID
956     @return: the job ID of the newly created job
957     @raise errors.JobQueueDrainError: if the job is marked for draining
958
959     """
960     if self._IsQueueMarkedDrain():
961       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
962
963     # Check job queue size
964     size = len(self._ListJobFiles())
965     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
966       # TODO: Autoarchive jobs. Make sure it's not done on every job
967       # submission, though.
968       #size = ...
969       pass
970
971     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
972       raise errors.JobQueueFull()
973
974     # Get job identifier
975     job_id = self._NewSerialUnlocked()
976     job = _QueuedJob(self, job_id, ops)
977
978     # Write to disk
979     self.UpdateJobUnlocked(job)
980
981     logging.debug("Adding new job %s to the cache", job_id)
982     self._memcache[job_id] = job
983
984     # Add to worker pool
985     self._wpool.AddTask(job)
986
987     return job.id
988
989   @utils.LockedMethod
990   @_RequireOpenQueue
991   def SubmitJob(self, ops):
992     """Create and store a new job.
993
994     @see: L{_SubmitJobUnlocked}
995
996     """
997     return self._SubmitJobUnlocked(ops)
998
999   @utils.LockedMethod
1000   @_RequireOpenQueue
1001   def SubmitManyJobs(self, jobs):
1002     """Create and store multiple jobs.
1003
1004     @see: L{_SubmitJobUnlocked}
1005
1006     """
1007     results = []
1008     for ops in jobs:
1009       try:
1010         data = self._SubmitJobUnlocked(ops)
1011         status = True
1012       except errors.GenericError, err:
1013         data = str(err)
1014         status = False
1015       results.append((status, data))
1016
1017     return results
1018
1019
1020   @_RequireOpenQueue
1021   def UpdateJobUnlocked(self, job):
1022     """Update a job's on disk storage.
1023
1024     After a job has been modified, this function needs to be called in
1025     order to write the changes to disk and replicate them to the other
1026     nodes.
1027
1028     @type job: L{_QueuedJob}
1029     @param job: the changed job
1030
1031     """
1032     filename = self._GetJobPath(job.id)
1033     data = serializer.DumpJson(job.Serialize(), indent=False)
1034     logging.debug("Writing job %s to %s", job.id, filename)
1035     self._WriteAndReplicateFileUnlocked(filename, data)
1036
1037     # Notify waiters about potential changes
1038     job.change.notifyAll()
1039
1040   @utils.LockedMethod
1041   @_RequireOpenQueue
1042   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1043                         timeout):
1044     """Waits for changes in a job.
1045
1046     @type job_id: string
1047     @param job_id: Job identifier
1048     @type fields: list of strings
1049     @param fields: Which fields to check for changes
1050     @type prev_job_info: list or None
1051     @param prev_job_info: Last job information returned
1052     @type prev_log_serial: int
1053     @param prev_log_serial: Last job message serial number
1054     @type timeout: float
1055     @param timeout: maximum time to wait
1056     @rtype: tuple (job info, log entries)
1057     @return: a tuple of the job information as required via
1058         the fields parameter, and the log entries as a list
1059
1060         if the job has not changed and the timeout has expired,
1061         we instead return a special value,
1062         L{constants.JOB_NOTCHANGED}, which should be interpreted
1063         as such by the clients
1064
1065     """
1066     logging.debug("Waiting for changes in job %s", job_id)
1067     end_time = time.time() + timeout
1068     while True:
1069       delta_time = end_time - time.time()
1070       if delta_time < 0:
1071         return constants.JOB_NOTCHANGED
1072
1073       job = self._LoadJobUnlocked(job_id)
1074       if not job:
1075         logging.debug("Job %s not found", job_id)
1076         break
1077
1078       status = job.CalcStatus()
1079       job_info = self._GetJobInfoUnlocked(job, fields)
1080       log_entries = job.GetLogEntries(prev_log_serial)
1081
1082       # Serializing and deserializing data can cause type changes (e.g. from
1083       # tuple to list) or precision loss. We're doing it here so that we get
1084       # the same modifications as the data received from the client. Without
1085       # this, the comparison afterwards might fail without the data being
1086       # significantly different.
1087       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1088       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1089
1090       if status not in (constants.JOB_STATUS_QUEUED,
1091                         constants.JOB_STATUS_RUNNING,
1092                         constants.JOB_STATUS_WAITLOCK):
1093         # Don't even try to wait if the job is no longer running, there will be
1094         # no changes.
1095         break
1096
1097       if (prev_job_info != job_info or
1098           (log_entries and prev_log_serial != log_entries[0][0])):
1099         break
1100
1101       logging.debug("Waiting again")
1102
1103       # Release the queue lock while waiting
1104       job.change.wait(delta_time)
1105
1106     logging.debug("Job %s changed", job_id)
1107
1108     return (job_info, log_entries)
1109
1110   @utils.LockedMethod
1111   @_RequireOpenQueue
1112   def CancelJob(self, job_id):
1113     """Cancels a job.
1114
1115     This will only succeed if the job has not started yet.
1116
1117     @type job_id: string
1118     @param job_id: job ID of job to be cancelled.
1119
1120     """
1121     logging.info("Cancelling job %s", job_id)
1122
1123     job = self._LoadJobUnlocked(job_id)
1124     if not job:
1125       logging.debug("Job %s not found", job_id)
1126       return (False, "Job %s not found" % job_id)
1127
1128     job_status = job.CalcStatus()
1129
1130     if job_status not in (constants.JOB_STATUS_QUEUED,
1131                           constants.JOB_STATUS_WAITLOCK):
1132       logging.debug("Job %s is no longer in the queue", job.id)
1133       return (False, "Job %s is no longer in the queue" % job.id)
1134
1135     if job_status == constants.JOB_STATUS_QUEUED:
1136       self.CancelJobUnlocked(job)
1137       return (True, "Job %s canceled" % job.id)
1138
1139     elif job_status == constants.JOB_STATUS_WAITLOCK:
1140       # The worker will notice the new status and cancel the job
1141       try:
1142         for op in job.ops:
1143           op.status = constants.OP_STATUS_CANCELING
1144       finally:
1145         self.UpdateJobUnlocked(job)
1146       return (True, "Job %s will be canceled" % job.id)
1147
1148   @_RequireOpenQueue
1149   def CancelJobUnlocked(self, job):
1150     """Marks a job as canceled.
1151
1152     """
1153     try:
1154       for op in job.ops:
1155         op.status = constants.OP_STATUS_CANCELED
1156         op.result = "Job canceled by request"
1157     finally:
1158       self.UpdateJobUnlocked(job)
1159
1160   @_RequireOpenQueue
1161   def _ArchiveJobsUnlocked(self, jobs):
1162     """Archives jobs.
1163
1164     @type jobs: list of L{_QueuedJob}
1165     @param jobs: Job objects
1166     @rtype: int
1167     @return: Number of archived jobs
1168
1169     """
1170     archive_jobs = []
1171     rename_files = []
1172     for job in jobs:
1173       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1174                                   constants.JOB_STATUS_SUCCESS,
1175                                   constants.JOB_STATUS_ERROR):
1176         logging.debug("Job %s is not yet done", job.id)
1177         continue
1178
1179       archive_jobs.append(job)
1180
1181       old = self._GetJobPath(job.id)
1182       new = self._GetArchivedJobPath(job.id)
1183       rename_files.append((old, new))
1184
1185     # TODO: What if 1..n files fail to rename?
1186     self._RenameFilesUnlocked(rename_files)
1187
1188     logging.debug("Successfully archived job(s) %s",
1189                   ", ".join(job.id for job in archive_jobs))
1190
1191     return len(archive_jobs)
1192
1193   @utils.LockedMethod
1194   @_RequireOpenQueue
1195   def ArchiveJob(self, job_id):
1196     """Archives a job.
1197
1198     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1199
1200     @type job_id: string
1201     @param job_id: Job ID of job to be archived.
1202     @rtype: bool
1203     @return: Whether job was archived
1204
1205     """
1206     logging.info("Archiving job %s", job_id)
1207
1208     job = self._LoadJobUnlocked(job_id)
1209     if not job:
1210       logging.debug("Job %s not found", job_id)
1211       return False
1212
1213     return self._ArchiveJobsUnlocked([job]) == 1
1214
1215   @utils.LockedMethod
1216   @_RequireOpenQueue
1217   def AutoArchiveJobs(self, age, timeout):
1218     """Archives all jobs based on age.
1219
1220     The method will archive all jobs which are older than the age
1221     parameter. For jobs that don't have an end timestamp, the start
1222     timestamp will be considered. The special '-1' age will cause
1223     archival of all jobs (that are not running or queued).
1224
1225     @type age: int
1226     @param age: the minimum age in seconds
1227
1228     """
1229     logging.info("Archiving jobs with age more than %s seconds", age)
1230
1231     now = time.time()
1232     end_time = now + timeout
1233     archived_count = 0
1234     last_touched = 0
1235
1236     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1237     pending = []
1238     for idx, job_id in enumerate(all_job_ids):
1239       last_touched = idx
1240
1241       # Not optimal because jobs could be pending
1242       # TODO: Measure average duration for job archival and take number of
1243       # pending jobs into account.
1244       if time.time() > end_time:
1245         break
1246
1247       # Returns None if the job failed to load
1248       job = self._LoadJobUnlocked(job_id)
1249       if job:
1250         if job.end_timestamp is None:
1251           if job.start_timestamp is None:
1252             job_age = job.received_timestamp
1253           else:
1254             job_age = job.start_timestamp
1255         else:
1256           job_age = job.end_timestamp
1257
1258         if age == -1 or now - job_age[0] > age:
1259           pending.append(job)
1260
1261           # Archive 10 jobs at a time
1262           if len(pending) >= 10:
1263             archived_count += self._ArchiveJobsUnlocked(pending)
1264             pending = []
1265
1266     if pending:
1267       archived_count += self._ArchiveJobsUnlocked(pending)
1268
1269     return (archived_count, len(all_job_ids) - last_touched - 1)
1270
1271   def _GetJobInfoUnlocked(self, job, fields):
1272     """Returns information about a job.
1273
1274     @type job: L{_QueuedJob}
1275     @param job: the job which we query
1276     @type fields: list
1277     @param fields: names of fields to return
1278     @rtype: list
1279     @return: list with one element for each field
1280     @raise errors.OpExecError: when an invalid field
1281         has been passed
1282
1283     """
1284     row = []
1285     for fname in fields:
1286       if fname == "id":
1287         row.append(job.id)
1288       elif fname == "status":
1289         row.append(job.CalcStatus())
1290       elif fname == "ops":
1291         row.append([op.input.__getstate__() for op in job.ops])
1292       elif fname == "opresult":
1293         row.append([op.result for op in job.ops])
1294       elif fname == "opstatus":
1295         row.append([op.status for op in job.ops])
1296       elif fname == "oplog":
1297         row.append([op.log for op in job.ops])
1298       elif fname == "opstart":
1299         row.append([op.start_timestamp for op in job.ops])
1300       elif fname == "opend":
1301         row.append([op.end_timestamp for op in job.ops])
1302       elif fname == "received_ts":
1303         row.append(job.received_timestamp)
1304       elif fname == "start_ts":
1305         row.append(job.start_timestamp)
1306       elif fname == "end_ts":
1307         row.append(job.end_timestamp)
1308       elif fname == "summary":
1309         row.append([op.input.Summary() for op in job.ops])
1310       else:
1311         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1312     return row
1313
1314   @utils.LockedMethod
1315   @_RequireOpenQueue
1316   def QueryJobs(self, job_ids, fields):
1317     """Returns a list of jobs in queue.
1318
1319     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1320     processing for each job.
1321
1322     @type job_ids: list
1323     @param job_ids: sequence of job identifiers or None for all
1324     @type fields: list
1325     @param fields: names of fields to return
1326     @rtype: list
1327     @return: list one element per job, each element being list with
1328         the requested fields
1329
1330     """
1331     jobs = []
1332
1333     for job in self._GetJobsUnlocked(job_ids):
1334       if job is None:
1335         jobs.append(None)
1336       else:
1337         jobs.append(self._GetJobInfoUnlocked(job, fields))
1338
1339     return jobs
1340
1341   @utils.LockedMethod
1342   @_RequireOpenQueue
1343   def Shutdown(self):
1344     """Stops the job queue.
1345
1346     This shutdowns all the worker threads an closes the queue.
1347
1348     """
1349     self._wpool.TerminateWorkers()
1350
1351     self._queue_lock.Close()
1352     self._queue_lock = None