Move the default MAC prefix to the constants file
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encasulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   def __init__(self, op):
84     """Constructor for the _QuededOpCode.
85
86     @type op: L{opcodes.OpCode}
87     @param op: the opcode we encapsulate
88
89     """
90     self.input = op
91     self.status = constants.OP_STATUS_QUEUED
92     self.result = None
93     self.log = []
94     self.start_timestamp = None
95     self.end_timestamp = None
96
97   @classmethod
98   def Restore(cls, state):
99     """Restore the _QueuedOpCode from the serialized form.
100
101     @type state: dict
102     @param state: the serialized state
103     @rtype: _QueuedOpCode
104     @return: a new _QueuedOpCode instance
105
106     """
107     obj = _QueuedOpCode.__new__(cls)
108     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
109     obj.status = state["status"]
110     obj.result = state["result"]
111     obj.log = state["log"]
112     obj.start_timestamp = state.get("start_timestamp", None)
113     obj.end_timestamp = state.get("end_timestamp", None)
114     return obj
115
116   def Serialize(self):
117     """Serializes this _QueuedOpCode.
118
119     @rtype: dict
120     @return: the dictionary holding the serialized state
121
122     """
123     return {
124       "input": self.input.__getstate__(),
125       "status": self.status,
126       "result": self.result,
127       "log": self.log,
128       "start_timestamp": self.start_timestamp,
129       "end_timestamp": self.end_timestamp,
130       }
131
132
133 class _QueuedJob(object):
134   """In-memory job representation.
135
136   This is what we use to track the user-submitted jobs. Locking must
137   be taken care of by users of this class.
138
139   @type queue: L{JobQueue}
140   @ivar queue: the parent queue
141   @ivar id: the job ID
142   @type ops: list
143   @ivar ops: the list of _QueuedOpCode that constitute the job
144   @type run_op_index: int
145   @ivar run_op_index: the currently executing opcode, or -1 if
146       we didn't yet start executing
147   @type log_serial: int
148   @ivar log_serial: holds the index for the next log entry
149   @ivar received_timestamp: the timestamp for when the job was received
150   @ivar start_timestmap: the timestamp for start of execution
151   @ivar end_timestamp: the timestamp for end of execution
152   @ivar change: a Condition variable we use for waiting for job changes
153
154   """
155   def __init__(self, queue, job_id, ops):
156     """Constructor for the _QueuedJob.
157
158     @type queue: L{JobQueue}
159     @param queue: our parent queue
160     @type job_id: job_id
161     @param job_id: our job id
162     @type ops: list
163     @param ops: the list of opcodes we hold, which will be encapsulated
164         in _QueuedOpCodes
165
166     """
167     if not ops:
168       # TODO: use a better exception
169       raise Exception("No opcodes")
170
171     self.queue = queue
172     self.id = job_id
173     self.ops = [_QueuedOpCode(op) for op in ops]
174     self.run_op_index = -1
175     self.log_serial = 0
176     self.received_timestamp = TimeStampNow()
177     self.start_timestamp = None
178     self.end_timestamp = None
179
180     # Condition to wait for changes
181     self.change = threading.Condition(self.queue._lock)
182
183   @classmethod
184   def Restore(cls, queue, state):
185     """Restore a _QueuedJob from serialized state:
186
187     @type queue: L{JobQueue}
188     @param queue: to which queue the restored job belongs
189     @type state: dict
190     @param state: the serialized state
191     @rtype: _JobQueue
192     @return: the restored _JobQueue instance
193
194     """
195     obj = _QueuedJob.__new__(cls)
196     obj.queue = queue
197     obj.id = state["id"]
198     obj.run_op_index = state["run_op_index"]
199     obj.received_timestamp = state.get("received_timestamp", None)
200     obj.start_timestamp = state.get("start_timestamp", None)
201     obj.end_timestamp = state.get("end_timestamp", None)
202
203     obj.ops = []
204     obj.log_serial = 0
205     for op_state in state["ops"]:
206       op = _QueuedOpCode.Restore(op_state)
207       for log_entry in op.log:
208         obj.log_serial = max(obj.log_serial, log_entry[0])
209       obj.ops.append(op)
210
211     # Condition to wait for changes
212     obj.change = threading.Condition(obj.queue._lock)
213
214     return obj
215
216   def Serialize(self):
217     """Serialize the _JobQueue instance.
218
219     @rtype: dict
220     @return: the serialized state
221
222     """
223     return {
224       "id": self.id,
225       "ops": [op.Serialize() for op in self.ops],
226       "run_op_index": self.run_op_index,
227       "start_timestamp": self.start_timestamp,
228       "end_timestamp": self.end_timestamp,
229       "received_timestamp": self.received_timestamp,
230       }
231
232   def CalcStatus(self):
233     """Compute the status of this job.
234
235     This function iterates over all the _QueuedOpCodes in the job and
236     based on their status, computes the job status.
237
238     The algorithm is:
239       - if we find a cancelled, or finished with error, the job
240         status will be the same
241       - otherwise, the last opcode with the status one of:
242           - waitlock
243           - canceling
244           - running
245
246         will determine the job status
247
248       - otherwise, it means either all opcodes are queued, or success,
249         and the job status will be the same
250
251     @return: the job status
252
253     """
254     status = constants.JOB_STATUS_QUEUED
255
256     all_success = True
257     for op in self.ops:
258       if op.status == constants.OP_STATUS_SUCCESS:
259         continue
260
261       all_success = False
262
263       if op.status == constants.OP_STATUS_QUEUED:
264         pass
265       elif op.status == constants.OP_STATUS_WAITLOCK:
266         status = constants.JOB_STATUS_WAITLOCK
267       elif op.status == constants.OP_STATUS_RUNNING:
268         status = constants.JOB_STATUS_RUNNING
269       elif op.status == constants.OP_STATUS_CANCELING:
270         status = constants.JOB_STATUS_CANCELING
271         break
272       elif op.status == constants.OP_STATUS_ERROR:
273         status = constants.JOB_STATUS_ERROR
274         # The whole job fails if one opcode failed
275         break
276       elif op.status == constants.OP_STATUS_CANCELED:
277         status = constants.OP_STATUS_CANCELED
278         break
279
280     if all_success:
281       status = constants.JOB_STATUS_SUCCESS
282
283     return status
284
285   def GetLogEntries(self, newer_than):
286     """Selectively returns the log entries.
287
288     @type newer_than: None or int
289     @param newer_than: if this is None, return all log enties,
290         otherwise return only the log entries with serial higher
291         than this value
292     @rtype: list
293     @return: the list of the log entries selected
294
295     """
296     if newer_than is None:
297       serial = -1
298     else:
299       serial = newer_than
300
301     entries = []
302     for op in self.ops:
303       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304
305     return entries
306
307
308 class _JobQueueWorker(workerpool.BaseWorker):
309   """The actual job workers.
310
311   """
312   def _NotifyStart(self):
313     """Mark the opcode as running, not lock-waiting.
314
315     This is called from the mcpu code as a notifier function, when the
316     LU is finally about to start the Exec() method. Of course, to have
317     end-user visible results, the opcode must be initially (before
318     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
319
320     """
321     assert self.queue, "Queue attribute is missing"
322     assert self.opcode, "Opcode attribute is missing"
323
324     self.queue.acquire()
325     try:
326       assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
327                                     constants.OP_STATUS_CANCELING)
328
329       # Cancel here if we were asked to
330       if self.opcode.status == constants.OP_STATUS_CANCELING:
331         raise CancelJob()
332
333       self.opcode.status = constants.OP_STATUS_RUNNING
334     finally:
335       self.queue.release()
336
337   def RunTask(self, job):
338     """Job executor.
339
340     This functions processes a job. It is closely tied to the _QueuedJob and
341     _QueuedOpCode classes.
342
343     @type job: L{_QueuedJob}
344     @param job: the job to be processed
345
346     """
347     logging.debug("Worker %s processing job %s",
348                   self.worker_id, job.id)
349     proc = mcpu.Processor(self.pool.queue.context)
350     self.queue = queue = job.queue
351     try:
352       try:
353         count = len(job.ops)
354         for idx, op in enumerate(job.ops):
355           try:
356             logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
357
358             queue.acquire()
359             try:
360               assert op.status == constants.OP_STATUS_QUEUED
361               job.run_op_index = idx
362               op.status = constants.OP_STATUS_WAITLOCK
363               op.result = None
364               op.start_timestamp = TimeStampNow()
365               if idx == 0: # first opcode
366                 job.start_timestamp = op.start_timestamp
367               queue.UpdateJobUnlocked(job)
368
369               input_opcode = op.input
370             finally:
371               queue.release()
372
373             def _Log(*args):
374               """Append a log entry.
375
376               """
377               assert len(args) < 3
378
379               if len(args) == 1:
380                 log_type = constants.ELOG_MESSAGE
381                 log_msg = args[0]
382               else:
383                 log_type, log_msg = args
384
385               # The time is split to make serialization easier and not lose
386               # precision.
387               timestamp = utils.SplitTime(time.time())
388
389               queue.acquire()
390               try:
391                 job.log_serial += 1
392                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
393
394                 job.change.notifyAll()
395               finally:
396                 queue.release()
397
398             # Make sure not to hold lock while _Log is called
399             self.opcode = op
400             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
401
402             queue.acquire()
403             try:
404               op.status = constants.OP_STATUS_SUCCESS
405               op.result = result
406               op.end_timestamp = TimeStampNow()
407               queue.UpdateJobUnlocked(job)
408             finally:
409               queue.release()
410
411             logging.debug("Op %s/%s: Successfully finished %s",
412                           idx + 1, count, op)
413           except CancelJob:
414             # Will be handled further up
415             raise
416           except Exception, err:
417             queue.acquire()
418             try:
419               try:
420                 op.status = constants.OP_STATUS_ERROR
421                 op.result = str(err)
422                 op.end_timestamp = TimeStampNow()
423                 logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
424               finally:
425                 queue.UpdateJobUnlocked(job)
426             finally:
427               queue.release()
428             raise
429
430       except CancelJob:
431         queue.acquire()
432         try:
433           queue.CancelJobUnlocked(job)
434         finally:
435           queue.release()
436       except errors.GenericError, err:
437         logging.exception("Ganeti exception")
438       except:
439         logging.exception("Unhandled exception")
440     finally:
441       queue.acquire()
442       try:
443         try:
444           job.run_op_idx = -1
445           job.end_timestamp = TimeStampNow()
446           queue.UpdateJobUnlocked(job)
447         finally:
448           job_id = job.id
449           status = job.CalcStatus()
450       finally:
451         queue.release()
452       logging.debug("Worker %s finished job %s, status = %s",
453                     self.worker_id, job_id, status)
454
455
456 class _JobQueueWorkerPool(workerpool.WorkerPool):
457   """Simple class implementing a job-processing workerpool.
458
459   """
460   def __init__(self, queue):
461     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
462                                               _JobQueueWorker)
463     self.queue = queue
464
465
466 class JobQueue(object):
467   """Quue used to manaage the jobs.
468
469   @cvar _RE_JOB_FILE: regex matching the valid job file names
470
471   """
472   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
473
474   def _RequireOpenQueue(fn):
475     """Decorator for "public" functions.
476
477     This function should be used for all 'public' functions. That is,
478     functions usually called from other classes.
479
480     @warning: Use this decorator only after utils.LockedMethod!
481
482     Example::
483       @utils.LockedMethod
484       @_RequireOpenQueue
485       def Example(self):
486         pass
487
488     """
489     def wrapper(self, *args, **kwargs):
490       assert self._queue_lock is not None, "Queue should be open"
491       return fn(self, *args, **kwargs)
492     return wrapper
493
494   def __init__(self, context):
495     """Constructor for JobQueue.
496
497     The constructor will initialize the job queue object and then
498     start loading the current jobs from disk, either for starting them
499     (if they were queue) or for aborting them (if they were already
500     running).
501
502     @type context: GanetiContext
503     @param context: the context object for access to the configuration
504         data and other ganeti objects
505
506     """
507     self.context = context
508     self._memcache = weakref.WeakValueDictionary()
509     self._my_hostname = utils.HostInfo().name
510
511     # Locking
512     self._lock = threading.Lock()
513     self.acquire = self._lock.acquire
514     self.release = self._lock.release
515
516     # Initialize
517     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
518
519     # Read serial file
520     self._last_serial = jstore.ReadSerial()
521     assert self._last_serial is not None, ("Serial file was modified between"
522                                            " check in jstore and here")
523
524     # Get initial list of nodes
525     self._nodes = dict((n.name, n.primary_ip)
526                        for n in self.context.cfg.GetAllNodesInfo().values()
527                        if n.master_candidate)
528
529     # Remove master node
530     try:
531       del self._nodes[self._my_hostname]
532     except KeyError:
533       pass
534
535     # TODO: Check consistency across nodes
536
537     # Setup worker pool
538     self._wpool = _JobQueueWorkerPool(self)
539     try:
540       # We need to lock here because WorkerPool.AddTask() may start a job while
541       # we're still doing our work.
542       self.acquire()
543       try:
544         logging.info("Inspecting job queue")
545
546         all_job_ids = self._GetJobIDsUnlocked()
547         jobs_count = len(all_job_ids)
548         lastinfo = time.time()
549         for idx, job_id in enumerate(all_job_ids):
550           # Give an update every 1000 jobs or 10 seconds
551           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
552               idx == (jobs_count - 1)):
553             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
554                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
555             lastinfo = time.time()
556
557           job = self._LoadJobUnlocked(job_id)
558
559           # a failure in loading the job can cause 'None' to be returned
560           if job is None:
561             continue
562
563           status = job.CalcStatus()
564
565           if status in (constants.JOB_STATUS_QUEUED, ):
566             self._wpool.AddTask(job)
567
568           elif status in (constants.JOB_STATUS_RUNNING,
569                           constants.JOB_STATUS_WAITLOCK,
570                           constants.JOB_STATUS_CANCELING):
571             logging.warning("Unfinished job %s found: %s", job.id, job)
572             try:
573               for op in job.ops:
574                 op.status = constants.OP_STATUS_ERROR
575                 op.result = "Unclean master daemon shutdown"
576             finally:
577               self.UpdateJobUnlocked(job)
578
579         logging.info("Job queue inspection finished")
580       finally:
581         self.release()
582     except:
583       self._wpool.TerminateWorkers()
584       raise
585
586   @utils.LockedMethod
587   @_RequireOpenQueue
588   def AddNode(self, node):
589     """Register a new node with the queue.
590
591     @type node: L{objects.Node}
592     @param node: the node object to be added
593
594     """
595     node_name = node.name
596     assert node_name != self._my_hostname
597
598     # Clean queue directory on added node
599     rpc.RpcRunner.call_jobqueue_purge(node_name)
600
601     if not node.master_candidate:
602       # remove if existing, ignoring errors
603       self._nodes.pop(node_name, None)
604       # and skip the replication of the job ids
605       return
606
607     # Upload the whole queue excluding archived jobs
608     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
609
610     # Upload current serial file
611     files.append(constants.JOB_QUEUE_SERIAL_FILE)
612
613     for file_name in files:
614       # Read file content
615       fd = open(file_name, "r")
616       try:
617         content = fd.read()
618       finally:
619         fd.close()
620
621       result = rpc.RpcRunner.call_jobqueue_update([node_name],
622                                                   [node.primary_ip],
623                                                   file_name, content)
624       if not result[node_name]:
625         logging.error("Failed to upload %s to %s", file_name, node_name)
626
627     self._nodes[node_name] = node.primary_ip
628
629   @utils.LockedMethod
630   @_RequireOpenQueue
631   def RemoveNode(self, node_name):
632     """Callback called when removing nodes from the cluster.
633
634     @type node_name: str
635     @param node_name: the name of the node to remove
636
637     """
638     try:
639       # The queue is removed by the "leave node" RPC call.
640       del self._nodes[node_name]
641     except KeyError:
642       pass
643
644   def _CheckRpcResult(self, result, nodes, failmsg):
645     """Verifies the status of an RPC call.
646
647     Since we aim to keep consistency should this node (the current
648     master) fail, we will log errors if our rpc fail, and especially
649     log the case when more than half of the nodes failes.
650
651     @param result: the data as returned from the rpc call
652     @type nodes: list
653     @param nodes: the list of nodes we made the call to
654     @type failmsg: str
655     @param failmsg: the identifier to be used for logging
656
657     """
658     failed = []
659     success = []
660
661     for node in nodes:
662       if result[node]:
663         success.append(node)
664       else:
665         failed.append(node)
666
667     if failed:
668       logging.error("%s failed on %s", failmsg, ", ".join(failed))
669
670     # +1 for the master node
671     if (len(success) + 1) < len(failed):
672       # TODO: Handle failing nodes
673       logging.error("More than half of the nodes failed")
674
675   def _GetNodeIp(self):
676     """Helper for returning the node name/ip list.
677
678     @rtype: (list, list)
679     @return: a tuple of two lists, the first one with the node
680         names and the second one with the node addresses
681
682     """
683     name_list = self._nodes.keys()
684     addr_list = [self._nodes[name] for name in name_list]
685     return name_list, addr_list
686
687   def _WriteAndReplicateFileUnlocked(self, file_name, data):
688     """Writes a file locally and then replicates it to all nodes.
689
690     This function will replace the contents of a file on the local
691     node and then replicate it to all the other nodes we have.
692
693     @type file_name: str
694     @param file_name: the path of the file to be replicated
695     @type data: str
696     @param data: the new contents of the file
697
698     """
699     utils.WriteFile(file_name, data=data)
700
701     names, addrs = self._GetNodeIp()
702     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
703     self._CheckRpcResult(result, self._nodes,
704                          "Updating %s" % file_name)
705
706   def _RenameFilesUnlocked(self, rename):
707     """Renames a file locally and then replicate the change.
708
709     This function will rename a file in the local queue directory
710     and then replicate this rename to all the other nodes we have.
711
712     @type rename: list of (old, new)
713     @param rename: List containing tuples mapping old to new names
714
715     """
716     # Rename them locally
717     for old, new in rename:
718       utils.RenameFile(old, new, mkdir=True)
719
720     # ... and on all nodes
721     names, addrs = self._GetNodeIp()
722     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
723     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
724
725   def _FormatJobID(self, job_id):
726     """Convert a job ID to string format.
727
728     Currently this just does C{str(job_id)} after performing some
729     checks, but if we want to change the job id format this will
730     abstract this change.
731
732     @type job_id: int or long
733     @param job_id: the numeric job id
734     @rtype: str
735     @return: the formatted job id
736
737     """
738     if not isinstance(job_id, (int, long)):
739       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
740     if job_id < 0:
741       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
742
743     return str(job_id)
744
745   @classmethod
746   def _GetArchiveDirectory(cls, job_id):
747     """Returns the archive directory for a job.
748
749     @type job_id: str
750     @param job_id: Job identifier
751     @rtype: str
752     @return: Directory name
753
754     """
755     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
756
757   def _NewSerialUnlocked(self):
758     """Generates a new job identifier.
759
760     Job identifiers are unique during the lifetime of a cluster.
761
762     @rtype: str
763     @return: a string representing the job identifier.
764
765     """
766     # New number
767     serial = self._last_serial + 1
768
769     # Write to file
770     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
771                                         "%s\n" % serial)
772
773     # Keep it only if we were able to write the file
774     self._last_serial = serial
775
776     return self._FormatJobID(serial)
777
778   @staticmethod
779   def _GetJobPath(job_id):
780     """Returns the job file for a given job id.
781
782     @type job_id: str
783     @param job_id: the job identifier
784     @rtype: str
785     @return: the path to the job file
786
787     """
788     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
789
790   @classmethod
791   def _GetArchivedJobPath(cls, job_id):
792     """Returns the archived job file for a give job id.
793
794     @type job_id: str
795     @param job_id: the job identifier
796     @rtype: str
797     @return: the path to the archived job file
798
799     """
800     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
801     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
802
803   @classmethod
804   def _ExtractJobID(cls, name):
805     """Extract the job id from a filename.
806
807     @type name: str
808     @param name: the job filename
809     @rtype: job id or None
810     @return: the job id corresponding to the given filename,
811         or None if the filename does not represent a valid
812         job file
813
814     """
815     m = cls._RE_JOB_FILE.match(name)
816     if m:
817       return m.group(1)
818     else:
819       return None
820
821   def _GetJobIDsUnlocked(self, archived=False):
822     """Return all known job IDs.
823
824     If the parameter archived is True, archived jobs IDs will be
825     included. Currently this argument is unused.
826
827     The method only looks at disk because it's a requirement that all
828     jobs are present on disk (so in the _memcache we don't have any
829     extra IDs).
830
831     @rtype: list
832     @return: the list of job IDs
833
834     """
835     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
836     jlist = utils.NiceSort(jlist)
837     return jlist
838
839   def _ListJobFiles(self):
840     """Returns the list of current job files.
841
842     @rtype: list
843     @return: the list of job file names
844
845     """
846     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
847             if self._RE_JOB_FILE.match(name)]
848
849   def _LoadJobUnlocked(self, job_id):
850     """Loads a job from the disk or memory.
851
852     Given a job id, this will return the cached job object if
853     existing, or try to load the job from the disk. If loading from
854     disk, it will also add the job to the cache.
855
856     @param job_id: the job id
857     @rtype: L{_QueuedJob} or None
858     @return: either None or the job object
859
860     """
861     job = self._memcache.get(job_id, None)
862     if job:
863       logging.debug("Found job %s in memcache", job_id)
864       return job
865
866     filepath = self._GetJobPath(job_id)
867     logging.debug("Loading job from %s", filepath)
868     try:
869       fd = open(filepath, "r")
870     except IOError, err:
871       if err.errno in (errno.ENOENT, ):
872         return None
873       raise
874     try:
875       data = serializer.LoadJson(fd.read())
876     finally:
877       fd.close()
878
879     try:
880       job = _QueuedJob.Restore(self, data)
881     except Exception, err:
882       new_path = self._GetArchivedJobPath(job_id)
883       if filepath == new_path:
884         # job already archived (future case)
885         logging.exception("Can't parse job %s", job_id)
886       else:
887         # non-archived case
888         logging.exception("Can't parse job %s, will archive.", job_id)
889         self._RenameFilesUnlocked([(filepath, new_path)])
890       return None
891
892     self._memcache[job_id] = job
893     logging.debug("Added job %s to the cache", job_id)
894     return job
895
896   def _GetJobsUnlocked(self, job_ids):
897     """Return a list of jobs based on their IDs.
898
899     @type job_ids: list
900     @param job_ids: either an empty list (meaning all jobs),
901         or a list of job IDs
902     @rtype: list
903     @return: the list of job objects
904
905     """
906     if not job_ids:
907       job_ids = self._GetJobIDsUnlocked()
908
909     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
910
911   @staticmethod
912   def _IsQueueMarkedDrain():
913     """Check if the queue is marked from drain.
914
915     This currently uses the queue drain file, which makes it a
916     per-node flag. In the future this can be moved to the config file.
917
918     @rtype: boolean
919     @return: True of the job queue is marked for draining
920
921     """
922     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
923
924   @staticmethod
925   def SetDrainFlag(drain_flag):
926     """Sets the drain flag for the queue.
927
928     This is similar to the function L{backend.JobQueueSetDrainFlag},
929     and in the future we might merge them.
930
931     @type drain_flag: boolean
932     @param drain_flag: wheter to set or unset the drain flag
933
934     """
935     if drain_flag:
936       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
937     else:
938       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
939     return True
940
941   @utils.LockedMethod
942   @_RequireOpenQueue
943   def SubmitJob(self, ops):
944     """Create and store a new job.
945
946     This enters the job into our job queue and also puts it on the new
947     queue, in order for it to be picked up by the queue processors.
948
949     @type ops: list
950     @param ops: The list of OpCodes that will become the new job.
951     @rtype: job ID
952     @return: the job ID of the newly created job
953     @raise errors.JobQueueDrainError: if the job is marked for draining
954
955     """
956     if self._IsQueueMarkedDrain():
957       raise errors.JobQueueDrainError()
958
959     # Check job queue size
960     size = len(self._ListJobFiles())
961     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
962       # TODO: Autoarchive jobs. Make sure it's not done on every job
963       # submission, though.
964       #size = ...
965       pass
966
967     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
968       raise errors.JobQueueFull()
969
970     # Get job identifier
971     job_id = self._NewSerialUnlocked()
972     job = _QueuedJob(self, job_id, ops)
973
974     # Write to disk
975     self.UpdateJobUnlocked(job)
976
977     logging.debug("Adding new job %s to the cache", job_id)
978     self._memcache[job_id] = job
979
980     # Add to worker pool
981     self._wpool.AddTask(job)
982
983     return job.id
984
985   @_RequireOpenQueue
986   def UpdateJobUnlocked(self, job):
987     """Update a job's on disk storage.
988
989     After a job has been modified, this function needs to be called in
990     order to write the changes to disk and replicate them to the other
991     nodes.
992
993     @type job: L{_QueuedJob}
994     @param job: the changed job
995
996     """
997     filename = self._GetJobPath(job.id)
998     data = serializer.DumpJson(job.Serialize(), indent=False)
999     logging.debug("Writing job %s to %s", job.id, filename)
1000     self._WriteAndReplicateFileUnlocked(filename, data)
1001
1002     # Notify waiters about potential changes
1003     job.change.notifyAll()
1004
1005   @utils.LockedMethod
1006   @_RequireOpenQueue
1007   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1008                         timeout):
1009     """Waits for changes in a job.
1010
1011     @type job_id: string
1012     @param job_id: Job identifier
1013     @type fields: list of strings
1014     @param fields: Which fields to check for changes
1015     @type prev_job_info: list or None
1016     @param prev_job_info: Last job information returned
1017     @type prev_log_serial: int
1018     @param prev_log_serial: Last job message serial number
1019     @type timeout: float
1020     @param timeout: maximum time to wait
1021     @rtype: tuple (job info, log entries)
1022     @return: a tuple of the job information as required via
1023         the fields parameter, and the log entries as a list
1024
1025         if the job has not changed and the timeout has expired,
1026         we instead return a special value,
1027         L{constants.JOB_NOTCHANGED}, which should be interpreted
1028         as such by the clients
1029
1030     """
1031     logging.debug("Waiting for changes in job %s", job_id)
1032     end_time = time.time() + timeout
1033     while True:
1034       delta_time = end_time - time.time()
1035       if delta_time < 0:
1036         return constants.JOB_NOTCHANGED
1037
1038       job = self._LoadJobUnlocked(job_id)
1039       if not job:
1040         logging.debug("Job %s not found", job_id)
1041         break
1042
1043       status = job.CalcStatus()
1044       job_info = self._GetJobInfoUnlocked(job, fields)
1045       log_entries = job.GetLogEntries(prev_log_serial)
1046
1047       # Serializing and deserializing data can cause type changes (e.g. from
1048       # tuple to list) or precision loss. We're doing it here so that we get
1049       # the same modifications as the data received from the client. Without
1050       # this, the comparison afterwards might fail without the data being
1051       # significantly different.
1052       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1053       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1054
1055       if status not in (constants.JOB_STATUS_QUEUED,
1056                         constants.JOB_STATUS_RUNNING,
1057                         constants.JOB_STATUS_WAITLOCK):
1058         # Don't even try to wait if the job is no longer running, there will be
1059         # no changes.
1060         break
1061
1062       if (prev_job_info != job_info or
1063           (log_entries and prev_log_serial != log_entries[0][0])):
1064         break
1065
1066       logging.debug("Waiting again")
1067
1068       # Release the queue lock while waiting
1069       job.change.wait(delta_time)
1070
1071     logging.debug("Job %s changed", job_id)
1072
1073     return (job_info, log_entries)
1074
1075   @utils.LockedMethod
1076   @_RequireOpenQueue
1077   def CancelJob(self, job_id):
1078     """Cancels a job.
1079
1080     This will only succeed if the job has not started yet.
1081
1082     @type job_id: string
1083     @param job_id: job ID of job to be cancelled.
1084
1085     """
1086     logging.info("Cancelling job %s", job_id)
1087
1088     job = self._LoadJobUnlocked(job_id)
1089     if not job:
1090       logging.debug("Job %s not found", job_id)
1091       return (False, "Job %s not found" % job_id)
1092
1093     job_status = job.CalcStatus()
1094
1095     if job_status not in (constants.JOB_STATUS_QUEUED,
1096                           constants.JOB_STATUS_WAITLOCK):
1097       logging.debug("Job %s is no longer in the queue", job.id)
1098       return (False, "Job %s is no longer in the queue" % job.id)
1099
1100     if job_status == constants.JOB_STATUS_QUEUED:
1101       self.CancelJobUnlocked(job)
1102       return (True, "Job %s canceled" % job.id)
1103
1104     elif job_status == constants.JOB_STATUS_WAITLOCK:
1105       # The worker will notice the new status and cancel the job
1106       try:
1107         for op in job.ops:
1108           op.status = constants.OP_STATUS_CANCELING
1109       finally:
1110         self.UpdateJobUnlocked(job)
1111       return (True, "Job %s will be canceled" % job.id)
1112
1113   @_RequireOpenQueue
1114   def CancelJobUnlocked(self, job):
1115     """Marks a job as canceled.
1116
1117     """
1118     try:
1119       for op in job.ops:
1120         op.status = constants.OP_STATUS_ERROR
1121         op.result = "Job canceled by request"
1122     finally:
1123       self.UpdateJobUnlocked(job)
1124
1125   @_RequireOpenQueue
1126   def _ArchiveJobsUnlocked(self, jobs):
1127     """Archives jobs.
1128
1129     @type jobs: list of L{_QueuedJob}
1130     @param jobs: Job objects
1131     @rtype: int
1132     @return: Number of archived jobs
1133
1134     """
1135     archive_jobs = []
1136     rename_files = []
1137     for job in jobs:
1138       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1139                                   constants.JOB_STATUS_SUCCESS,
1140                                   constants.JOB_STATUS_ERROR):
1141         logging.debug("Job %s is not yet done", job.id)
1142         continue
1143
1144       archive_jobs.append(job)
1145
1146       old = self._GetJobPath(job.id)
1147       new = self._GetArchivedJobPath(job.id)
1148       rename_files.append((old, new))
1149
1150     # TODO: What if 1..n files fail to rename?
1151     self._RenameFilesUnlocked(rename_files)
1152
1153     logging.debug("Successfully archived job(s) %s",
1154                   ", ".join(job.id for job in archive_jobs))
1155
1156     return len(archive_jobs)
1157
1158   @utils.LockedMethod
1159   @_RequireOpenQueue
1160   def ArchiveJob(self, job_id):
1161     """Archives a job.
1162
1163     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1164
1165     @type job_id: string
1166     @param job_id: Job ID of job to be archived.
1167     @rtype: bool
1168     @return: Whether job was archived
1169
1170     """
1171     logging.info("Archiving job %s", job_id)
1172
1173     job = self._LoadJobUnlocked(job_id)
1174     if not job:
1175       logging.debug("Job %s not found", job_id)
1176       return False
1177
1178     return self._ArchiveJobUnlocked([job]) == 1
1179
1180   @utils.LockedMethod
1181   @_RequireOpenQueue
1182   def AutoArchiveJobs(self, age, timeout):
1183     """Archives all jobs based on age.
1184
1185     The method will archive all jobs which are older than the age
1186     parameter. For jobs that don't have an end timestamp, the start
1187     timestamp will be considered. The special '-1' age will cause
1188     archival of all jobs (that are not running or queued).
1189
1190     @type age: int
1191     @param age: the minimum age in seconds
1192
1193     """
1194     logging.info("Archiving jobs with age more than %s seconds", age)
1195
1196     now = time.time()
1197     end_time = now + timeout
1198     archived_count = 0
1199     last_touched = 0
1200
1201     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1202     pending = []
1203     for idx, job_id in enumerate(all_job_ids):
1204       last_touched = idx
1205
1206       # Not optimal because jobs could be pending
1207       # TODO: Measure average duration for job archival and take number of
1208       # pending jobs into account.
1209       if time.time() > end_time:
1210         break
1211
1212       # Returns None if the job failed to load
1213       job = self._LoadJobUnlocked(job_id)
1214       if job:
1215         if job.end_timestamp is None:
1216           if job.start_timestamp is None:
1217             job_age = job.received_timestamp
1218           else:
1219             job_age = job.start_timestamp
1220         else:
1221           job_age = job.end_timestamp
1222
1223         if age == -1 or now - job_age[0] > age:
1224           pending.append(job)
1225
1226           # Archive 10 jobs at a time
1227           if len(pending) >= 10:
1228             archived_count += self._ArchiveJobsUnlocked(pending)
1229             pending = []
1230
1231     if pending:
1232       archived_count += self._ArchiveJobsUnlocked(pending)
1233
1234     return (archived_count, len(all_job_ids) - last_touched - 1)
1235
1236   def _GetJobInfoUnlocked(self, job, fields):
1237     """Returns information about a job.
1238
1239     @type job: L{_QueuedJob}
1240     @param job: the job which we query
1241     @type fields: list
1242     @param fields: names of fields to return
1243     @rtype: list
1244     @return: list with one element for each field
1245     @raise errors.OpExecError: when an invalid field
1246         has been passed
1247
1248     """
1249     row = []
1250     for fname in fields:
1251       if fname == "id":
1252         row.append(job.id)
1253       elif fname == "status":
1254         row.append(job.CalcStatus())
1255       elif fname == "ops":
1256         row.append([op.input.__getstate__() for op in job.ops])
1257       elif fname == "opresult":
1258         row.append([op.result for op in job.ops])
1259       elif fname == "opstatus":
1260         row.append([op.status for op in job.ops])
1261       elif fname == "oplog":
1262         row.append([op.log for op in job.ops])
1263       elif fname == "opstart":
1264         row.append([op.start_timestamp for op in job.ops])
1265       elif fname == "opend":
1266         row.append([op.end_timestamp for op in job.ops])
1267       elif fname == "received_ts":
1268         row.append(job.received_timestamp)
1269       elif fname == "start_ts":
1270         row.append(job.start_timestamp)
1271       elif fname == "end_ts":
1272         row.append(job.end_timestamp)
1273       elif fname == "summary":
1274         row.append([op.input.Summary() for op in job.ops])
1275       else:
1276         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1277     return row
1278
1279   @utils.LockedMethod
1280   @_RequireOpenQueue
1281   def QueryJobs(self, job_ids, fields):
1282     """Returns a list of jobs in queue.
1283
1284     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1285     processing for each job.
1286
1287     @type job_ids: list
1288     @param job_ids: sequence of job identifiers or None for all
1289     @type fields: list
1290     @param fields: names of fields to return
1291     @rtype: list
1292     @return: list one element per job, each element being list with
1293         the requested fields
1294
1295     """
1296     jobs = []
1297
1298     for job in self._GetJobsUnlocked(job_ids):
1299       if job is None:
1300         jobs.append(None)
1301       else:
1302         jobs.append(self._GetJobInfoUnlocked(job, fields))
1303
1304     return jobs
1305
1306   @utils.LockedMethod
1307   @_RequireOpenQueue
1308   def Shutdown(self):
1309     """Stops the job queue.
1310
1311     This shutdowns all the worker threads an closes the queue.
1312
1313     """
1314     self._wpool.TerminateWorkers()
1315
1316     self._queue_lock.Close()
1317     self._queue_lock = None