Partial cherry-pick of 6c881c5 from the 2.1 branch
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   __slots__ = ["input", "status", "result", "log",
84                "start_timestamp", "end_timestamp",
85                "__weakref__"]
86
87   def __init__(self, op):
88     """Constructor for the _QuededOpCode.
89
90     @type op: L{opcodes.OpCode}
91     @param op: the opcode we encapsulate
92
93     """
94     self.input = op
95     self.status = constants.OP_STATUS_QUEUED
96     self.result = None
97     self.log = []
98     self.start_timestamp = None
99     self.end_timestamp = None
100
101   @classmethod
102   def Restore(cls, state):
103     """Restore the _QueuedOpCode from the serialized form.
104
105     @type state: dict
106     @param state: the serialized state
107     @rtype: _QueuedOpCode
108     @return: a new _QueuedOpCode instance
109
110     """
111     obj = _QueuedOpCode.__new__(cls)
112     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113     obj.status = state["status"]
114     obj.result = state["result"]
115     obj.log = state["log"]
116     obj.start_timestamp = state.get("start_timestamp", None)
117     obj.end_timestamp = state.get("end_timestamp", None)
118     return obj
119
120   def Serialize(self):
121     """Serializes this _QueuedOpCode.
122
123     @rtype: dict
124     @return: the dictionary holding the serialized state
125
126     """
127     return {
128       "input": self.input.__getstate__(),
129       "status": self.status,
130       "result": self.result,
131       "log": self.log,
132       "start_timestamp": self.start_timestamp,
133       "end_timestamp": self.end_timestamp,
134       }
135
136
137 class _QueuedJob(object):
138   """In-memory job representation.
139
140   This is what we use to track the user-submitted jobs. Locking must
141   be taken care of by users of this class.
142
143   @type queue: L{JobQueue}
144   @ivar queue: the parent queue
145   @ivar id: the job ID
146   @type ops: list
147   @ivar ops: the list of _QueuedOpCode that constitute the job
148   @type run_op_index: int
149   @ivar run_op_index: the currently executing opcode, or -1 if
150       we didn't yet start executing
151   @type log_serial: int
152   @ivar log_serial: holds the index for the next log entry
153   @ivar received_timestamp: the timestamp for when the job was received
154   @ivar start_timestmap: the timestamp for start of execution
155   @ivar end_timestamp: the timestamp for end of execution
156   @ivar change: a Condition variable we use for waiting for job changes
157
158   """
159   __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160                "received_timestamp", "start_timestamp", "end_timestamp",
161                "change",
162                "__weakref__"]
163
164   def __init__(self, queue, job_id, ops):
165     """Constructor for the _QueuedJob.
166
167     @type queue: L{JobQueue}
168     @param queue: our parent queue
169     @type job_id: job_id
170     @param job_id: our job id
171     @type ops: list
172     @param ops: the list of opcodes we hold, which will be encapsulated
173         in _QueuedOpCodes
174
175     """
176     if not ops:
177       # TODO: use a better exception
178       raise Exception("No opcodes")
179
180     self.queue = queue
181     self.id = job_id
182     self.ops = [_QueuedOpCode(op) for op in ops]
183     self.run_op_index = -1
184     self.log_serial = 0
185     self.received_timestamp = TimeStampNow()
186     self.start_timestamp = None
187     self.end_timestamp = None
188
189     # Condition to wait for changes
190     self.change = threading.Condition(self.queue._lock)
191
192   @classmethod
193   def Restore(cls, queue, state):
194     """Restore a _QueuedJob from serialized state:
195
196     @type queue: L{JobQueue}
197     @param queue: to which queue the restored job belongs
198     @type state: dict
199     @param state: the serialized state
200     @rtype: _JobQueue
201     @return: the restored _JobQueue instance
202
203     """
204     obj = _QueuedJob.__new__(cls)
205     obj.queue = queue
206     obj.id = state["id"]
207     obj.run_op_index = state["run_op_index"]
208     obj.received_timestamp = state.get("received_timestamp", None)
209     obj.start_timestamp = state.get("start_timestamp", None)
210     obj.end_timestamp = state.get("end_timestamp", None)
211
212     obj.ops = []
213     obj.log_serial = 0
214     for op_state in state["ops"]:
215       op = _QueuedOpCode.Restore(op_state)
216       for log_entry in op.log:
217         obj.log_serial = max(obj.log_serial, log_entry[0])
218       obj.ops.append(op)
219
220     # Condition to wait for changes
221     obj.change = threading.Condition(obj.queue._lock)
222
223     return obj
224
225   def Serialize(self):
226     """Serialize the _JobQueue instance.
227
228     @rtype: dict
229     @return: the serialized state
230
231     """
232     return {
233       "id": self.id,
234       "ops": [op.Serialize() for op in self.ops],
235       "run_op_index": self.run_op_index,
236       "start_timestamp": self.start_timestamp,
237       "end_timestamp": self.end_timestamp,
238       "received_timestamp": self.received_timestamp,
239       }
240
241   def CalcStatus(self):
242     """Compute the status of this job.
243
244     This function iterates over all the _QueuedOpCodes in the job and
245     based on their status, computes the job status.
246
247     The algorithm is:
248       - if we find a cancelled, or finished with error, the job
249         status will be the same
250       - otherwise, the last opcode with the status one of:
251           - waitlock
252           - canceling
253           - running
254
255         will determine the job status
256
257       - otherwise, it means either all opcodes are queued, or success,
258         and the job status will be the same
259
260     @return: the job status
261
262     """
263     status = constants.JOB_STATUS_QUEUED
264
265     all_success = True
266     for op in self.ops:
267       if op.status == constants.OP_STATUS_SUCCESS:
268         continue
269
270       all_success = False
271
272       if op.status == constants.OP_STATUS_QUEUED:
273         pass
274       elif op.status == constants.OP_STATUS_WAITLOCK:
275         status = constants.JOB_STATUS_WAITLOCK
276       elif op.status == constants.OP_STATUS_RUNNING:
277         status = constants.JOB_STATUS_RUNNING
278       elif op.status == constants.OP_STATUS_CANCELING:
279         status = constants.JOB_STATUS_CANCELING
280         break
281       elif op.status == constants.OP_STATUS_ERROR:
282         status = constants.JOB_STATUS_ERROR
283         # The whole job fails if one opcode failed
284         break
285       elif op.status == constants.OP_STATUS_CANCELED:
286         status = constants.OP_STATUS_CANCELED
287         break
288
289     if all_success:
290       status = constants.JOB_STATUS_SUCCESS
291
292     return status
293
294   def GetLogEntries(self, newer_than):
295     """Selectively returns the log entries.
296
297     @type newer_than: None or int
298     @param newer_than: if this is None, return all log entries,
299         otherwise return only the log entries with serial higher
300         than this value
301     @rtype: list
302     @return: the list of the log entries selected
303
304     """
305     if newer_than is None:
306       serial = -1
307     else:
308       serial = newer_than
309
310     entries = []
311     for op in self.ops:
312       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313
314     return entries
315
316   def MarkUnfinishedOps(self, status, result):
317     """Mark unfinished opcodes with a given status and result.
318
319     This is an utility function for marking all running or waiting to
320     be run opcodes with a given status. Opcodes which are already
321     finalised are not changed.
322
323     @param status: a given opcode status
324     @param result: the opcode result
325
326     """
327     not_marked = True
328     for op in self.ops:
329       if op.status in constants.OPS_FINALIZED:
330         assert not_marked, "Finalized opcodes found after non-finalized ones"
331         continue
332       op.status = status
333       op.result = result
334       not_marked = False
335
336
337 class _JobQueueWorker(workerpool.BaseWorker):
338   """The actual job workers.
339
340   """
341   def _NotifyStart(self):
342     """Mark the opcode as running, not lock-waiting.
343
344     This is called from the mcpu code as a notifier function, when the
345     LU is finally about to start the Exec() method. Of course, to have
346     end-user visible results, the opcode must be initially (before
347     calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
348
349     """
350     assert self.queue, "Queue attribute is missing"
351     assert self.opcode, "Opcode attribute is missing"
352
353     self.queue.acquire()
354     try:
355       assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
356                                     constants.OP_STATUS_CANCELING)
357
358       # Cancel here if we were asked to
359       if self.opcode.status == constants.OP_STATUS_CANCELING:
360         raise CancelJob()
361
362       self.opcode.status = constants.OP_STATUS_RUNNING
363     finally:
364       self.queue.release()
365
366   def RunTask(self, job):
367     """Job executor.
368
369     This functions processes a job. It is closely tied to the _QueuedJob and
370     _QueuedOpCode classes.
371
372     @type job: L{_QueuedJob}
373     @param job: the job to be processed
374
375     """
376     logging.info("Worker %s processing job %s",
377                   self.worker_id, job.id)
378     proc = mcpu.Processor(self.pool.queue.context)
379     self.queue = queue = job.queue
380     try:
381       try:
382         count = len(job.ops)
383         for idx, op in enumerate(job.ops):
384           op_summary = op.input.Summary()
385           if op.status == constants.OP_STATUS_SUCCESS:
386             # this is a job that was partially completed before master
387             # daemon shutdown, so it can be expected that some opcodes
388             # are already completed successfully (if any did error
389             # out, then the whole job should have been aborted and not
390             # resubmitted for processing)
391             logging.info("Op %s/%s: opcode %s already processed, skipping",
392                          idx + 1, count, op_summary)
393             continue
394           try:
395             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
396                          op_summary)
397
398             queue.acquire()
399             try:
400               if op.status == constants.OP_STATUS_CANCELED:
401                 raise CancelJob()
402               assert op.status == constants.OP_STATUS_QUEUED
403               job.run_op_index = idx
404               op.status = constants.OP_STATUS_WAITLOCK
405               op.result = None
406               op.start_timestamp = TimeStampNow()
407               if idx == 0: # first opcode
408                 job.start_timestamp = op.start_timestamp
409               queue.UpdateJobUnlocked(job)
410
411               input_opcode = op.input
412             finally:
413               queue.release()
414
415             def _Log(*args):
416               """Append a log entry.
417
418               """
419               assert len(args) < 3
420
421               if len(args) == 1:
422                 log_type = constants.ELOG_MESSAGE
423                 log_msg = args[0]
424               else:
425                 log_type, log_msg = args
426
427               # The time is split to make serialization easier and not lose
428               # precision.
429               timestamp = utils.SplitTime(time.time())
430
431               queue.acquire()
432               try:
433                 job.log_serial += 1
434                 op.log.append((job.log_serial, timestamp, log_type, log_msg))
435
436                 job.change.notifyAll()
437               finally:
438                 queue.release()
439
440             # Make sure not to hold lock while _Log is called
441             self.opcode = op
442             result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
443
444             queue.acquire()
445             try:
446               op.status = constants.OP_STATUS_SUCCESS
447               op.result = result
448               op.end_timestamp = TimeStampNow()
449               queue.UpdateJobUnlocked(job)
450             finally:
451               queue.release()
452
453             logging.info("Op %s/%s: Successfully finished opcode %s",
454                          idx + 1, count, op_summary)
455           except CancelJob:
456             # Will be handled further up
457             raise
458           except Exception, err:
459             queue.acquire()
460             try:
461               try:
462                 op.status = constants.OP_STATUS_ERROR
463                 if isinstance(err, errors.GenericError):
464                   op.result = errors.EncodeException(err)
465                 else:
466                   op.result = str(err)
467                 op.end_timestamp = TimeStampNow()
468                 logging.info("Op %s/%s: Error in opcode %s: %s",
469                              idx + 1, count, op_summary, err)
470               finally:
471                 queue.UpdateJobUnlocked(job)
472             finally:
473               queue.release()
474             raise
475
476       except CancelJob:
477         queue.acquire()
478         try:
479           queue.CancelJobUnlocked(job)
480         finally:
481           queue.release()
482       except errors.GenericError, err:
483         logging.exception("Ganeti exception")
484       except:
485         logging.exception("Unhandled exception")
486     finally:
487       queue.acquire()
488       try:
489         try:
490           job.run_op_index = -1
491           job.end_timestamp = TimeStampNow()
492           queue.UpdateJobUnlocked(job)
493         finally:
494           job_id = job.id
495           status = job.CalcStatus()
496       finally:
497         queue.release()
498       logging.info("Worker %s finished job %s, status = %s",
499                    self.worker_id, job_id, status)
500
501
502 class _JobQueueWorkerPool(workerpool.WorkerPool):
503   """Simple class implementing a job-processing workerpool.
504
505   """
506   def __init__(self, queue):
507     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
508                                               _JobQueueWorker)
509     self.queue = queue
510
511
512 def _RequireOpenQueue(fn):
513   """Decorator for "public" functions.
514
515   This function should be used for all 'public' functions. That is,
516   functions usually called from other classes. Note that this should
517   be applied only to methods (not plain functions), since it expects
518   that the decorated function is called with a first argument that has
519   a '_queue_lock' argument.
520
521   @warning: Use this decorator only after utils.LockedMethod!
522
523   Example::
524     @utils.LockedMethod
525     @_RequireOpenQueue
526     def Example(self):
527       pass
528
529   """
530   def wrapper(self, *args, **kwargs):
531     assert self._queue_lock is not None, "Queue should be open"
532     return fn(self, *args, **kwargs)
533   return wrapper
534
535
536 class JobQueue(object):
537   """Queue used to manage the jobs.
538
539   @cvar _RE_JOB_FILE: regex matching the valid job file names
540
541   """
542   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
543
544   def __init__(self, context):
545     """Constructor for JobQueue.
546
547     The constructor will initialize the job queue object and then
548     start loading the current jobs from disk, either for starting them
549     (if they were queue) or for aborting them (if they were already
550     running).
551
552     @type context: GanetiContext
553     @param context: the context object for access to the configuration
554         data and other ganeti objects
555
556     """
557     self.context = context
558     self._memcache = weakref.WeakValueDictionary()
559     self._my_hostname = utils.HostInfo().name
560
561     # Locking
562     self._lock = threading.Lock()
563     self.acquire = self._lock.acquire
564     self.release = self._lock.release
565
566     # Initialize
567     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
568
569     # Read serial file
570     self._last_serial = jstore.ReadSerial()
571     assert self._last_serial is not None, ("Serial file was modified between"
572                                            " check in jstore and here")
573
574     # Get initial list of nodes
575     self._nodes = dict((n.name, n.primary_ip)
576                        for n in self.context.cfg.GetAllNodesInfo().values()
577                        if n.master_candidate)
578
579     # Remove master node
580     try:
581       del self._nodes[self._my_hostname]
582     except KeyError:
583       pass
584
585     # TODO: Check consistency across nodes
586
587     # Setup worker pool
588     self._wpool = _JobQueueWorkerPool(self)
589     try:
590       # We need to lock here because WorkerPool.AddTask() may start a job while
591       # we're still doing our work.
592       self.acquire()
593       try:
594         logging.info("Inspecting job queue")
595
596         all_job_ids = self._GetJobIDsUnlocked()
597         jobs_count = len(all_job_ids)
598         lastinfo = time.time()
599         for idx, job_id in enumerate(all_job_ids):
600           # Give an update every 1000 jobs or 10 seconds
601           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
602               idx == (jobs_count - 1)):
603             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
604                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
605             lastinfo = time.time()
606
607           job = self._LoadJobUnlocked(job_id)
608
609           # a failure in loading the job can cause 'None' to be returned
610           if job is None:
611             continue
612
613           status = job.CalcStatus()
614
615           if status in (constants.JOB_STATUS_QUEUED, ):
616             self._wpool.AddTask(job)
617
618           elif status in (constants.JOB_STATUS_RUNNING,
619                           constants.JOB_STATUS_WAITLOCK,
620                           constants.JOB_STATUS_CANCELING):
621             logging.warning("Unfinished job %s found: %s", job.id, job)
622             try:
623               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
624                                     "Unclean master daemon shutdown")
625             finally:
626               self.UpdateJobUnlocked(job)
627
628         logging.info("Job queue inspection finished")
629       finally:
630         self.release()
631     except:
632       self._wpool.TerminateWorkers()
633       raise
634
635   @utils.LockedMethod
636   @_RequireOpenQueue
637   def AddNode(self, node):
638     """Register a new node with the queue.
639
640     @type node: L{objects.Node}
641     @param node: the node object to be added
642
643     """
644     node_name = node.name
645     assert node_name != self._my_hostname
646
647     # Clean queue directory on added node
648     rpc.RpcRunner.call_jobqueue_purge(node_name)
649
650     if not node.master_candidate:
651       # remove if existing, ignoring errors
652       self._nodes.pop(node_name, None)
653       # and skip the replication of the job ids
654       return
655
656     # Upload the whole queue excluding archived jobs
657     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
658
659     # Upload current serial file
660     files.append(constants.JOB_QUEUE_SERIAL_FILE)
661
662     for file_name in files:
663       # Read file content
664       fd = open(file_name, "r")
665       try:
666         content = fd.read()
667       finally:
668         fd.close()
669
670       result = rpc.RpcRunner.call_jobqueue_update([node_name],
671                                                   [node.primary_ip],
672                                                   file_name, content)
673       if not result[node_name]:
674         logging.error("Failed to upload %s to %s", file_name, node_name)
675
676     self._nodes[node_name] = node.primary_ip
677
678   @utils.LockedMethod
679   @_RequireOpenQueue
680   def RemoveNode(self, node_name):
681     """Callback called when removing nodes from the cluster.
682
683     @type node_name: str
684     @param node_name: the name of the node to remove
685
686     """
687     try:
688       # The queue is removed by the "leave node" RPC call.
689       del self._nodes[node_name]
690     except KeyError:
691       pass
692
693   def _CheckRpcResult(self, result, nodes, failmsg):
694     """Verifies the status of an RPC call.
695
696     Since we aim to keep consistency should this node (the current
697     master) fail, we will log errors if our rpc fail, and especially
698     log the case when more than half of the nodes fails.
699
700     @param result: the data as returned from the rpc call
701     @type nodes: list
702     @param nodes: the list of nodes we made the call to
703     @type failmsg: str
704     @param failmsg: the identifier to be used for logging
705
706     """
707     failed = []
708     success = []
709
710     for node in nodes:
711       if result[node]:
712         success.append(node)
713       else:
714         failed.append(node)
715
716     if failed:
717       logging.error("%s failed on %s", failmsg, ", ".join(failed))
718
719     # +1 for the master node
720     if (len(success) + 1) < len(failed):
721       # TODO: Handle failing nodes
722       logging.error("More than half of the nodes failed")
723
724   def _GetNodeIp(self):
725     """Helper for returning the node name/ip list.
726
727     @rtype: (list, list)
728     @return: a tuple of two lists, the first one with the node
729         names and the second one with the node addresses
730
731     """
732     name_list = self._nodes.keys()
733     addr_list = [self._nodes[name] for name in name_list]
734     return name_list, addr_list
735
736   def _WriteAndReplicateFileUnlocked(self, file_name, data):
737     """Writes a file locally and then replicates it to all nodes.
738
739     This function will replace the contents of a file on the local
740     node and then replicate it to all the other nodes we have.
741
742     @type file_name: str
743     @param file_name: the path of the file to be replicated
744     @type data: str
745     @param data: the new contents of the file
746
747     """
748     utils.WriteFile(file_name, data=data)
749
750     names, addrs = self._GetNodeIp()
751     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
752     self._CheckRpcResult(result, self._nodes,
753                          "Updating %s" % file_name)
754
755   def _RenameFilesUnlocked(self, rename):
756     """Renames a file locally and then replicate the change.
757
758     This function will rename a file in the local queue directory
759     and then replicate this rename to all the other nodes we have.
760
761     @type rename: list of (old, new)
762     @param rename: List containing tuples mapping old to new names
763
764     """
765     # Rename them locally
766     for old, new in rename:
767       utils.RenameFile(old, new, mkdir=True)
768
769     # ... and on all nodes
770     names, addrs = self._GetNodeIp()
771     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
772     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
773
774   def _FormatJobID(self, job_id):
775     """Convert a job ID to string format.
776
777     Currently this just does C{str(job_id)} after performing some
778     checks, but if we want to change the job id format this will
779     abstract this change.
780
781     @type job_id: int or long
782     @param job_id: the numeric job id
783     @rtype: str
784     @return: the formatted job id
785
786     """
787     if not isinstance(job_id, (int, long)):
788       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
789     if job_id < 0:
790       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
791
792     return str(job_id)
793
794   @classmethod
795   def _GetArchiveDirectory(cls, job_id):
796     """Returns the archive directory for a job.
797
798     @type job_id: str
799     @param job_id: Job identifier
800     @rtype: str
801     @return: Directory name
802
803     """
804     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
805
806   def _NewSerialsUnlocked(self, count):
807     """Generates a new job identifier.
808
809     Job identifiers are unique during the lifetime of a cluster.
810
811     @type count: integer
812     @param count: how many serials to return
813     @rtype: str
814     @return: a string representing the job identifier.
815
816     """
817     assert count > 0
818     # New number
819     serial = self._last_serial + count
820
821     # Write to file
822     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
823                                         "%s\n" % serial)
824
825     result = [self._FormatJobID(v)
826               for v in range(self._last_serial, serial + 1)]
827     # Keep it only if we were able to write the file
828     self._last_serial = serial
829
830     return result
831
832   @staticmethod
833   def _GetJobPath(job_id):
834     """Returns the job file for a given job id.
835
836     @type job_id: str
837     @param job_id: the job identifier
838     @rtype: str
839     @return: the path to the job file
840
841     """
842     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
843
844   @classmethod
845   def _GetArchivedJobPath(cls, job_id):
846     """Returns the archived job file for a give job id.
847
848     @type job_id: str
849     @param job_id: the job identifier
850     @rtype: str
851     @return: the path to the archived job file
852
853     """
854     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
855     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
856
857   @classmethod
858   def _ExtractJobID(cls, name):
859     """Extract the job id from a filename.
860
861     @type name: str
862     @param name: the job filename
863     @rtype: job id or None
864     @return: the job id corresponding to the given filename,
865         or None if the filename does not represent a valid
866         job file
867
868     """
869     m = cls._RE_JOB_FILE.match(name)
870     if m:
871       return m.group(1)
872     else:
873       return None
874
875   def _GetJobIDsUnlocked(self, archived=False):
876     """Return all known job IDs.
877
878     If the parameter archived is True, archived jobs IDs will be
879     included. Currently this argument is unused.
880
881     The method only looks at disk because it's a requirement that all
882     jobs are present on disk (so in the _memcache we don't have any
883     extra IDs).
884
885     @rtype: list
886     @return: the list of job IDs
887
888     """
889     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
890     jlist = utils.NiceSort(jlist)
891     return jlist
892
893   def _ListJobFiles(self):
894     """Returns the list of current job files.
895
896     @rtype: list
897     @return: the list of job file names
898
899     """
900     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
901             if self._RE_JOB_FILE.match(name)]
902
903   def _LoadJobUnlocked(self, job_id):
904     """Loads a job from the disk or memory.
905
906     Given a job id, this will return the cached job object if
907     existing, or try to load the job from the disk. If loading from
908     disk, it will also add the job to the cache.
909
910     @param job_id: the job id
911     @rtype: L{_QueuedJob} or None
912     @return: either None or the job object
913
914     """
915     job = self._memcache.get(job_id, None)
916     if job:
917       logging.debug("Found job %s in memcache", job_id)
918       return job
919
920     filepath = self._GetJobPath(job_id)
921     logging.debug("Loading job from %s", filepath)
922     try:
923       fd = open(filepath, "r")
924     except IOError, err:
925       if err.errno in (errno.ENOENT, ):
926         return None
927       raise
928     try:
929       data = serializer.LoadJson(fd.read())
930     finally:
931       fd.close()
932
933     try:
934       job = _QueuedJob.Restore(self, data)
935     except Exception, err:
936       new_path = self._GetArchivedJobPath(job_id)
937       if filepath == new_path:
938         # job already archived (future case)
939         logging.exception("Can't parse job %s", job_id)
940       else:
941         # non-archived case
942         logging.exception("Can't parse job %s, will archive.", job_id)
943         self._RenameFilesUnlocked([(filepath, new_path)])
944       return None
945
946     self._memcache[job_id] = job
947     logging.debug("Added job %s to the cache", job_id)
948     return job
949
950   def _GetJobsUnlocked(self, job_ids):
951     """Return a list of jobs based on their IDs.
952
953     @type job_ids: list
954     @param job_ids: either an empty list (meaning all jobs),
955         or a list of job IDs
956     @rtype: list
957     @return: the list of job objects
958
959     """
960     if not job_ids:
961       job_ids = self._GetJobIDsUnlocked()
962
963     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
964
965   @staticmethod
966   def _IsQueueMarkedDrain():
967     """Check if the queue is marked from drain.
968
969     This currently uses the queue drain file, which makes it a
970     per-node flag. In the future this can be moved to the config file.
971
972     @rtype: boolean
973     @return: True of the job queue is marked for draining
974
975     """
976     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
977
978   @staticmethod
979   def SetDrainFlag(drain_flag):
980     """Sets the drain flag for the queue.
981
982     This is similar to the function L{backend.JobQueueSetDrainFlag},
983     and in the future we might merge them.
984
985     @type drain_flag: boolean
986     @param drain_flag: Whether to set or unset the drain flag
987
988     """
989     if drain_flag:
990       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
991     else:
992       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
993     return True
994
995   @_RequireOpenQueue
996   def _SubmitJobUnlocked(self, job_id, ops):
997     """Create and store a new job.
998
999     This enters the job into our job queue and also puts it on the new
1000     queue, in order for it to be picked up by the queue processors.
1001
1002     @type job_id: job ID
1003     @param jod_id: the job ID for the new job
1004     @type ops: list
1005     @param ops: The list of OpCodes that will become the new job.
1006     @rtype: job ID
1007     @return: the job ID of the newly created job
1008     @raise errors.JobQueueDrainError: if the job is marked for draining
1009
1010     """
1011     if self._IsQueueMarkedDrain():
1012       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1013
1014     # Check job queue size
1015     size = len(self._ListJobFiles())
1016     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1017       # TODO: Autoarchive jobs. Make sure it's not done on every job
1018       # submission, though.
1019       #size = ...
1020       pass
1021
1022     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1023       raise errors.JobQueueFull()
1024
1025     job = _QueuedJob(self, job_id, ops)
1026
1027     # Write to disk
1028     self.UpdateJobUnlocked(job)
1029
1030     logging.debug("Adding new job %s to the cache", job_id)
1031     self._memcache[job_id] = job
1032
1033     # Add to worker pool
1034     self._wpool.AddTask(job)
1035
1036     return job.id
1037
1038   @utils.LockedMethod
1039   @_RequireOpenQueue
1040   def SubmitJob(self, ops):
1041     """Create and store a new job.
1042
1043     @see: L{_SubmitJobUnlocked}
1044
1045     """
1046     job_id = self._NewSerialsUnlocked(1)[0]
1047     return self._SubmitJobUnlocked(job_id, ops)
1048
1049   @utils.LockedMethod
1050   @_RequireOpenQueue
1051   def SubmitManyJobs(self, jobs):
1052     """Create and store multiple jobs.
1053
1054     @see: L{_SubmitJobUnlocked}
1055
1056     """
1057     results = []
1058     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1059     for job_id, ops in zip(all_job_ids, jobs):
1060       try:
1061         data = self._SubmitJobUnlocked(job_id, ops)
1062         status = True
1063       except errors.GenericError, err:
1064         data = str(err)
1065         status = False
1066       results.append((status, data))
1067
1068     return results
1069
1070
1071   @_RequireOpenQueue
1072   def UpdateJobUnlocked(self, job):
1073     """Update a job's on disk storage.
1074
1075     After a job has been modified, this function needs to be called in
1076     order to write the changes to disk and replicate them to the other
1077     nodes.
1078
1079     @type job: L{_QueuedJob}
1080     @param job: the changed job
1081
1082     """
1083     filename = self._GetJobPath(job.id)
1084     data = serializer.DumpJson(job.Serialize(), indent=False)
1085     logging.debug("Writing job %s to %s", job.id, filename)
1086     self._WriteAndReplicateFileUnlocked(filename, data)
1087
1088     # Notify waiters about potential changes
1089     job.change.notifyAll()
1090
1091   @utils.LockedMethod
1092   @_RequireOpenQueue
1093   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1094                         timeout):
1095     """Waits for changes in a job.
1096
1097     @type job_id: string
1098     @param job_id: Job identifier
1099     @type fields: list of strings
1100     @param fields: Which fields to check for changes
1101     @type prev_job_info: list or None
1102     @param prev_job_info: Last job information returned
1103     @type prev_log_serial: int
1104     @param prev_log_serial: Last job message serial number
1105     @type timeout: float
1106     @param timeout: maximum time to wait
1107     @rtype: tuple (job info, log entries)
1108     @return: a tuple of the job information as required via
1109         the fields parameter, and the log entries as a list
1110
1111         if the job has not changed and the timeout has expired,
1112         we instead return a special value,
1113         L{constants.JOB_NOTCHANGED}, which should be interpreted
1114         as such by the clients
1115
1116     """
1117     logging.debug("Waiting for changes in job %s", job_id)
1118
1119     job_info = None
1120     log_entries = None
1121
1122     end_time = time.time() + timeout
1123     while True:
1124       delta_time = end_time - time.time()
1125       if delta_time < 0:
1126         return constants.JOB_NOTCHANGED
1127
1128       job = self._LoadJobUnlocked(job_id)
1129       if not job:
1130         logging.debug("Job %s not found", job_id)
1131         break
1132
1133       status = job.CalcStatus()
1134       job_info = self._GetJobInfoUnlocked(job, fields)
1135       log_entries = job.GetLogEntries(prev_log_serial)
1136
1137       # Serializing and deserializing data can cause type changes (e.g. from
1138       # tuple to list) or precision loss. We're doing it here so that we get
1139       # the same modifications as the data received from the client. Without
1140       # this, the comparison afterwards might fail without the data being
1141       # significantly different.
1142       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1143       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1144
1145       if status not in (constants.JOB_STATUS_QUEUED,
1146                         constants.JOB_STATUS_RUNNING,
1147                         constants.JOB_STATUS_WAITLOCK):
1148         # Don't even try to wait if the job is no longer running, there will be
1149         # no changes.
1150         break
1151
1152       if (prev_job_info != job_info or
1153           (log_entries and prev_log_serial != log_entries[0][0])):
1154         break
1155
1156       logging.debug("Waiting again")
1157
1158       # Release the queue lock while waiting
1159       job.change.wait(delta_time)
1160
1161     logging.debug("Job %s changed", job_id)
1162
1163     if job_info is None and log_entries is None:
1164       return None
1165     else:
1166       return (job_info, log_entries)
1167
1168   @utils.LockedMethod
1169   @_RequireOpenQueue
1170   def CancelJob(self, job_id):
1171     """Cancels a job.
1172
1173     This will only succeed if the job has not started yet.
1174
1175     @type job_id: string
1176     @param job_id: job ID of job to be cancelled.
1177
1178     """
1179     logging.info("Cancelling job %s", job_id)
1180
1181     job = self._LoadJobUnlocked(job_id)
1182     if not job:
1183       logging.debug("Job %s not found", job_id)
1184       return (False, "Job %s not found" % job_id)
1185
1186     job_status = job.CalcStatus()
1187
1188     if job_status not in (constants.JOB_STATUS_QUEUED,
1189                           constants.JOB_STATUS_WAITLOCK):
1190       logging.debug("Job %s is no longer waiting in the queue", job.id)
1191       return (False, "Job %s is no longer waiting in the queue" % job.id)
1192
1193     if job_status == constants.JOB_STATUS_QUEUED:
1194       self.CancelJobUnlocked(job)
1195       return (True, "Job %s canceled" % job.id)
1196
1197     elif job_status == constants.JOB_STATUS_WAITLOCK:
1198       # The worker will notice the new status and cancel the job
1199       try:
1200         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1201       finally:
1202         self.UpdateJobUnlocked(job)
1203       return (True, "Job %s will be canceled" % job.id)
1204
1205   @_RequireOpenQueue
1206   def CancelJobUnlocked(self, job):
1207     """Marks a job as canceled.
1208
1209     """
1210     try:
1211       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1212                             "Job canceled by request")
1213     finally:
1214       self.UpdateJobUnlocked(job)
1215
1216   @_RequireOpenQueue
1217   def _ArchiveJobsUnlocked(self, jobs):
1218     """Archives jobs.
1219
1220     @type jobs: list of L{_QueuedJob}
1221     @param jobs: Job objects
1222     @rtype: int
1223     @return: Number of archived jobs
1224
1225     """
1226     archive_jobs = []
1227     rename_files = []
1228     for job in jobs:
1229       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1230                                   constants.JOB_STATUS_SUCCESS,
1231                                   constants.JOB_STATUS_ERROR):
1232         logging.debug("Job %s is not yet done", job.id)
1233         continue
1234
1235       archive_jobs.append(job)
1236
1237       old = self._GetJobPath(job.id)
1238       new = self._GetArchivedJobPath(job.id)
1239       rename_files.append((old, new))
1240
1241     # TODO: What if 1..n files fail to rename?
1242     self._RenameFilesUnlocked(rename_files)
1243
1244     logging.debug("Successfully archived job(s) %s",
1245                   ", ".join(job.id for job in archive_jobs))
1246
1247     return len(archive_jobs)
1248
1249   @utils.LockedMethod
1250   @_RequireOpenQueue
1251   def ArchiveJob(self, job_id):
1252     """Archives a job.
1253
1254     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1255
1256     @type job_id: string
1257     @param job_id: Job ID of job to be archived.
1258     @rtype: bool
1259     @return: Whether job was archived
1260
1261     """
1262     logging.info("Archiving job %s", job_id)
1263
1264     job = self._LoadJobUnlocked(job_id)
1265     if not job:
1266       logging.debug("Job %s not found", job_id)
1267       return False
1268
1269     return self._ArchiveJobsUnlocked([job]) == 1
1270
1271   @utils.LockedMethod
1272   @_RequireOpenQueue
1273   def AutoArchiveJobs(self, age, timeout):
1274     """Archives all jobs based on age.
1275
1276     The method will archive all jobs which are older than the age
1277     parameter. For jobs that don't have an end timestamp, the start
1278     timestamp will be considered. The special '-1' age will cause
1279     archival of all jobs (that are not running or queued).
1280
1281     @type age: int
1282     @param age: the minimum age in seconds
1283
1284     """
1285     logging.info("Archiving jobs with age more than %s seconds", age)
1286
1287     now = time.time()
1288     end_time = now + timeout
1289     archived_count = 0
1290     last_touched = 0
1291
1292     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1293     pending = []
1294     for idx, job_id in enumerate(all_job_ids):
1295       last_touched = idx
1296
1297       # Not optimal because jobs could be pending
1298       # TODO: Measure average duration for job archival and take number of
1299       # pending jobs into account.
1300       if time.time() > end_time:
1301         break
1302
1303       # Returns None if the job failed to load
1304       job = self._LoadJobUnlocked(job_id)
1305       if job:
1306         if job.end_timestamp is None:
1307           if job.start_timestamp is None:
1308             job_age = job.received_timestamp
1309           else:
1310             job_age = job.start_timestamp
1311         else:
1312           job_age = job.end_timestamp
1313
1314         if age == -1 or now - job_age[0] > age:
1315           pending.append(job)
1316
1317           # Archive 10 jobs at a time
1318           if len(pending) >= 10:
1319             archived_count += self._ArchiveJobsUnlocked(pending)
1320             pending = []
1321
1322     if pending:
1323       archived_count += self._ArchiveJobsUnlocked(pending)
1324
1325     return (archived_count, len(all_job_ids) - last_touched - 1)
1326
1327   def _GetJobInfoUnlocked(self, job, fields):
1328     """Returns information about a job.
1329
1330     @type job: L{_QueuedJob}
1331     @param job: the job which we query
1332     @type fields: list
1333     @param fields: names of fields to return
1334     @rtype: list
1335     @return: list with one element for each field
1336     @raise errors.OpExecError: when an invalid field
1337         has been passed
1338
1339     """
1340     row = []
1341     for fname in fields:
1342       if fname == "id":
1343         row.append(job.id)
1344       elif fname == "status":
1345         row.append(job.CalcStatus())
1346       elif fname == "ops":
1347         row.append([op.input.__getstate__() for op in job.ops])
1348       elif fname == "opresult":
1349         row.append([op.result for op in job.ops])
1350       elif fname == "opstatus":
1351         row.append([op.status for op in job.ops])
1352       elif fname == "oplog":
1353         row.append([op.log for op in job.ops])
1354       elif fname == "opstart":
1355         row.append([op.start_timestamp for op in job.ops])
1356       elif fname == "opend":
1357         row.append([op.end_timestamp for op in job.ops])
1358       elif fname == "received_ts":
1359         row.append(job.received_timestamp)
1360       elif fname == "start_ts":
1361         row.append(job.start_timestamp)
1362       elif fname == "end_ts":
1363         row.append(job.end_timestamp)
1364       elif fname == "summary":
1365         row.append([op.input.Summary() for op in job.ops])
1366       else:
1367         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1368     return row
1369
1370   @utils.LockedMethod
1371   @_RequireOpenQueue
1372   def QueryJobs(self, job_ids, fields):
1373     """Returns a list of jobs in queue.
1374
1375     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1376     processing for each job.
1377
1378     @type job_ids: list
1379     @param job_ids: sequence of job identifiers or None for all
1380     @type fields: list
1381     @param fields: names of fields to return
1382     @rtype: list
1383     @return: list one element per job, each element being list with
1384         the requested fields
1385
1386     """
1387     jobs = []
1388
1389     for job in self._GetJobsUnlocked(job_ids):
1390       if job is None:
1391         jobs.append(None)
1392       else:
1393         jobs.append(self._GetJobInfoUnlocked(job, fields))
1394
1395     return jobs
1396
1397   @utils.LockedMethod
1398   @_RequireOpenQueue
1399   def Shutdown(self):
1400     """Stops the job queue.
1401
1402     This shutdowns all the worker threads an closes the queue.
1403
1404     """
1405     self._wpool.TerminateWorkers()
1406
1407     self._queue_lock.Close()
1408     self._queue_lock = None