Code and docstring style fixes
[ganeti-local] / lib / jqueue.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the job queue handling.
23
24 Locking: there's a single, large lock in the L{JobQueue} class. It's
25 used by all other classes in this module.
26
27 @var JOBQUEUE_THREADS: the number of worker threads we start for
28     processing jobs
29
30 """
31
32 import os
33 import logging
34 import threading
35 import errno
36 import re
37 import time
38 import weakref
39
40 from ganeti import constants
41 from ganeti import serializer
42 from ganeti import workerpool
43 from ganeti import opcodes
44 from ganeti import errors
45 from ganeti import mcpu
46 from ganeti import utils
47 from ganeti import jstore
48 from ganeti import rpc
49
50
51 JOBQUEUE_THREADS = 25
52 JOBS_PER_ARCHIVE_DIRECTORY = 10000
53
54
55 class CancelJob(Exception):
56   """Special exception to cancel a job.
57
58   """
59
60
61 def TimeStampNow():
62   """Returns the current timestamp.
63
64   @rtype: tuple
65   @return: the current time in the (seconds, microseconds) format
66
67   """
68   return utils.SplitTime(time.time())
69
70
71 class _QueuedOpCode(object):
72   """Encapsulates an opcode object.
73
74   @ivar log: holds the execution log and consists of tuples
75   of the form C{(log_serial, timestamp, level, message)}
76   @ivar input: the OpCode we encapsulate
77   @ivar status: the current status
78   @ivar result: the result of the LU execution
79   @ivar start_timestamp: timestamp for the start of the execution
80   @ivar stop_timestamp: timestamp for the end of the execution
81
82   """
83   __slots__ = ["input", "status", "result", "log",
84                "start_timestamp", "end_timestamp",
85                "__weakref__"]
86
87   def __init__(self, op):
88     """Constructor for the _QuededOpCode.
89
90     @type op: L{opcodes.OpCode}
91     @param op: the opcode we encapsulate
92
93     """
94     self.input = op
95     self.status = constants.OP_STATUS_QUEUED
96     self.result = None
97     self.log = []
98     self.start_timestamp = None
99     self.end_timestamp = None
100
101   @classmethod
102   def Restore(cls, state):
103     """Restore the _QueuedOpCode from the serialized form.
104
105     @type state: dict
106     @param state: the serialized state
107     @rtype: _QueuedOpCode
108     @return: a new _QueuedOpCode instance
109
110     """
111     obj = _QueuedOpCode.__new__(cls)
112     obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113     obj.status = state["status"]
114     obj.result = state["result"]
115     obj.log = state["log"]
116     obj.start_timestamp = state.get("start_timestamp", None)
117     obj.end_timestamp = state.get("end_timestamp", None)
118     return obj
119
120   def Serialize(self):
121     """Serializes this _QueuedOpCode.
122
123     @rtype: dict
124     @return: the dictionary holding the serialized state
125
126     """
127     return {
128       "input": self.input.__getstate__(),
129       "status": self.status,
130       "result": self.result,
131       "log": self.log,
132       "start_timestamp": self.start_timestamp,
133       "end_timestamp": self.end_timestamp,
134       }
135
136
137 class _QueuedJob(object):
138   """In-memory job representation.
139
140   This is what we use to track the user-submitted jobs. Locking must
141   be taken care of by users of this class.
142
143   @type queue: L{JobQueue}
144   @ivar queue: the parent queue
145   @ivar id: the job ID
146   @type ops: list
147   @ivar ops: the list of _QueuedOpCode that constitute the job
148   @type log_serial: int
149   @ivar log_serial: holds the index for the next log entry
150   @ivar received_timestamp: the timestamp for when the job was received
151   @ivar start_timestmap: the timestamp for start of execution
152   @ivar end_timestamp: the timestamp for end of execution
153   @ivar lock_status: In-memory locking information for debugging
154   @ivar change: a Condition variable we use for waiting for job changes
155
156   """
157   __slots__ = ["queue", "id", "ops", "log_serial",
158                "received_timestamp", "start_timestamp", "end_timestamp",
159                "lock_status", "change",
160                "__weakref__"]
161
162   def __init__(self, queue, job_id, ops):
163     """Constructor for the _QueuedJob.
164
165     @type queue: L{JobQueue}
166     @param queue: our parent queue
167     @type job_id: job_id
168     @param job_id: our job id
169     @type ops: list
170     @param ops: the list of opcodes we hold, which will be encapsulated
171         in _QueuedOpCodes
172
173     """
174     if not ops:
175       # TODO: use a better exception
176       raise Exception("No opcodes")
177
178     self.queue = queue
179     self.id = job_id
180     self.ops = [_QueuedOpCode(op) for op in ops]
181     self.log_serial = 0
182     self.received_timestamp = TimeStampNow()
183     self.start_timestamp = None
184     self.end_timestamp = None
185
186     # In-memory attributes
187     self.lock_status = None
188
189     # Condition to wait for changes
190     self.change = threading.Condition(self.queue._lock)
191
192   @classmethod
193   def Restore(cls, queue, state):
194     """Restore a _QueuedJob from serialized state:
195
196     @type queue: L{JobQueue}
197     @param queue: to which queue the restored job belongs
198     @type state: dict
199     @param state: the serialized state
200     @rtype: _JobQueue
201     @return: the restored _JobQueue instance
202
203     """
204     obj = _QueuedJob.__new__(cls)
205     obj.queue = queue
206     obj.id = state["id"]
207     obj.received_timestamp = state.get("received_timestamp", None)
208     obj.start_timestamp = state.get("start_timestamp", None)
209     obj.end_timestamp = state.get("end_timestamp", None)
210
211     # In-memory attributes
212     obj.lock_status = None
213
214     obj.ops = []
215     obj.log_serial = 0
216     for op_state in state["ops"]:
217       op = _QueuedOpCode.Restore(op_state)
218       for log_entry in op.log:
219         obj.log_serial = max(obj.log_serial, log_entry[0])
220       obj.ops.append(op)
221
222     # Condition to wait for changes
223     obj.change = threading.Condition(obj.queue._lock)
224
225     return obj
226
227   def Serialize(self):
228     """Serialize the _JobQueue instance.
229
230     @rtype: dict
231     @return: the serialized state
232
233     """
234     return {
235       "id": self.id,
236       "ops": [op.Serialize() for op in self.ops],
237       "start_timestamp": self.start_timestamp,
238       "end_timestamp": self.end_timestamp,
239       "received_timestamp": self.received_timestamp,
240       }
241
242   def CalcStatus(self):
243     """Compute the status of this job.
244
245     This function iterates over all the _QueuedOpCodes in the job and
246     based on their status, computes the job status.
247
248     The algorithm is:
249       - if we find a cancelled, or finished with error, the job
250         status will be the same
251       - otherwise, the last opcode with the status one of:
252           - waitlock
253           - canceling
254           - running
255
256         will determine the job status
257
258       - otherwise, it means either all opcodes are queued, or success,
259         and the job status will be the same
260
261     @return: the job status
262
263     """
264     status = constants.JOB_STATUS_QUEUED
265
266     all_success = True
267     for op in self.ops:
268       if op.status == constants.OP_STATUS_SUCCESS:
269         continue
270
271       all_success = False
272
273       if op.status == constants.OP_STATUS_QUEUED:
274         pass
275       elif op.status == constants.OP_STATUS_WAITLOCK:
276         status = constants.JOB_STATUS_WAITLOCK
277       elif op.status == constants.OP_STATUS_RUNNING:
278         status = constants.JOB_STATUS_RUNNING
279       elif op.status == constants.OP_STATUS_CANCELING:
280         status = constants.JOB_STATUS_CANCELING
281         break
282       elif op.status == constants.OP_STATUS_ERROR:
283         status = constants.JOB_STATUS_ERROR
284         # The whole job fails if one opcode failed
285         break
286       elif op.status == constants.OP_STATUS_CANCELED:
287         status = constants.OP_STATUS_CANCELED
288         break
289
290     if all_success:
291       status = constants.JOB_STATUS_SUCCESS
292
293     return status
294
295   def GetLogEntries(self, newer_than):
296     """Selectively returns the log entries.
297
298     @type newer_than: None or int
299     @param newer_than: if this is None, return all log entries,
300         otherwise return only the log entries with serial higher
301         than this value
302     @rtype: list
303     @return: the list of the log entries selected
304
305     """
306     if newer_than is None:
307       serial = -1
308     else:
309       serial = newer_than
310
311     entries = []
312     for op in self.ops:
313       entries.extend(filter(lambda entry: entry[0] > serial, op.log))
314
315     return entries
316
317   def MarkUnfinishedOps(self, status, result):
318     """Mark unfinished opcodes with a given status and result.
319
320     This is an utility function for marking all running or waiting to
321     be run opcodes with a given status. Opcodes which are already
322     finalised are not changed.
323
324     @param status: a given opcode status
325     @param result: the opcode result
326
327     """
328     not_marked = True
329     for op in self.ops:
330       if op.status in constants.OPS_FINALIZED:
331         assert not_marked, "Finalized opcodes found after non-finalized ones"
332         continue
333       op.status = status
334       op.result = result
335       not_marked = False
336
337
338 class _OpExecCallbacks(mcpu.OpExecCbBase):
339   def __init__(self, queue, job, op):
340     """Initializes this class.
341
342     @type queue: L{JobQueue}
343     @param queue: Job queue
344     @type job: L{_QueuedJob}
345     @param job: Job object
346     @type op: L{_QueuedOpCode}
347     @param op: OpCode
348
349     """
350     assert queue, "Queue is missing"
351     assert job, "Job is missing"
352     assert op, "Opcode is missing"
353
354     self._queue = queue
355     self._job = job
356     self._op = op
357
358   def NotifyStart(self):
359     """Mark the opcode as running, not lock-waiting.
360
361     This is called from the mcpu code as a notifier function, when the LU is
362     finally about to start the Exec() method. Of course, to have end-user
363     visible results, the opcode must be initially (before calling into
364     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
365
366     """
367     self._queue.acquire()
368     try:
369       assert self._op.status in (constants.OP_STATUS_WAITLOCK,
370                                  constants.OP_STATUS_CANCELING)
371
372       # All locks are acquired by now
373       self._job.lock_status = None
374
375       # Cancel here if we were asked to
376       if self._op.status == constants.OP_STATUS_CANCELING:
377         raise CancelJob()
378
379       self._op.status = constants.OP_STATUS_RUNNING
380     finally:
381       self._queue.release()
382
383   def Feedback(self, *args):
384     """Append a log entry.
385
386     """
387     assert len(args) < 3
388
389     if len(args) == 1:
390       log_type = constants.ELOG_MESSAGE
391       log_msg = args[0]
392     else:
393       (log_type, log_msg) = args
394
395     # The time is split to make serialization easier and not lose
396     # precision.
397     timestamp = utils.SplitTime(time.time())
398
399     self._queue.acquire()
400     try:
401       self._job.log_serial += 1
402       self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
403
404       self._job.change.notifyAll()
405     finally:
406       self._queue.release()
407
408   def ReportLocks(self, msg):
409     """Write locking information to the job.
410
411     Called whenever the LU processor is waiting for a lock or has acquired one.
412
413     """
414     # Not getting the queue lock because this is a single assignment
415     self._job.lock_status = msg
416
417
418 class _JobQueueWorker(workerpool.BaseWorker):
419   """The actual job workers.
420
421   """
422   def RunTask(self, job):
423     """Job executor.
424
425     This functions processes a job. It is closely tied to the _QueuedJob and
426     _QueuedOpCode classes.
427
428     @type job: L{_QueuedJob}
429     @param job: the job to be processed
430
431     """
432     logging.info("Worker %s processing job %s",
433                   self.worker_id, job.id)
434     proc = mcpu.Processor(self.pool.queue.context)
435     queue = job.queue
436     try:
437       try:
438         count = len(job.ops)
439         for idx, op in enumerate(job.ops):
440           op_summary = op.input.Summary()
441           if op.status == constants.OP_STATUS_SUCCESS:
442             # this is a job that was partially completed before master
443             # daemon shutdown, so it can be expected that some opcodes
444             # are already completed successfully (if any did error
445             # out, then the whole job should have been aborted and not
446             # resubmitted for processing)
447             logging.info("Op %s/%s: opcode %s already processed, skipping",
448                          idx + 1, count, op_summary)
449             continue
450           try:
451             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
452                          op_summary)
453
454             queue.acquire()
455             try:
456               if op.status == constants.OP_STATUS_CANCELED:
457                 raise CancelJob()
458               assert op.status == constants.OP_STATUS_QUEUED
459               op.status = constants.OP_STATUS_WAITLOCK
460               op.result = None
461               op.start_timestamp = TimeStampNow()
462               if idx == 0: # first opcode
463                 job.start_timestamp = op.start_timestamp
464               queue.UpdateJobUnlocked(job)
465
466               input_opcode = op.input
467             finally:
468               queue.release()
469
470             # Make sure not to hold queue lock while calling ExecOpCode
471             result = proc.ExecOpCode(input_opcode,
472                                      _OpExecCallbacks(queue, job, op))
473
474             queue.acquire()
475             try:
476               op.status = constants.OP_STATUS_SUCCESS
477               op.result = result
478               op.end_timestamp = TimeStampNow()
479               queue.UpdateJobUnlocked(job)
480             finally:
481               queue.release()
482
483             logging.info("Op %s/%s: Successfully finished opcode %s",
484                          idx + 1, count, op_summary)
485           except CancelJob:
486             # Will be handled further up
487             raise
488           except Exception, err:
489             queue.acquire()
490             try:
491               try:
492                 op.status = constants.OP_STATUS_ERROR
493                 if isinstance(err, errors.GenericError):
494                   op.result = errors.EncodeException(err)
495                 else:
496                   op.result = str(err)
497                 op.end_timestamp = TimeStampNow()
498                 logging.info("Op %s/%s: Error in opcode %s: %s",
499                              idx + 1, count, op_summary, err)
500               finally:
501                 queue.UpdateJobUnlocked(job)
502             finally:
503               queue.release()
504             raise
505
506       except CancelJob:
507         queue.acquire()
508         try:
509           queue.CancelJobUnlocked(job)
510         finally:
511           queue.release()
512       except errors.GenericError, err:
513         logging.exception("Ganeti exception")
514       except:
515         logging.exception("Unhandled exception")
516     finally:
517       queue.acquire()
518       try:
519         try:
520           job.lock_status = None
521           job.end_timestamp = TimeStampNow()
522           queue.UpdateJobUnlocked(job)
523         finally:
524           job_id = job.id
525           status = job.CalcStatus()
526       finally:
527         queue.release()
528
529       logging.info("Worker %s finished job %s, status = %s",
530                    self.worker_id, job_id, status)
531
532
533 class _JobQueueWorkerPool(workerpool.WorkerPool):
534   """Simple class implementing a job-processing workerpool.
535
536   """
537   def __init__(self, queue):
538     super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
539                                               _JobQueueWorker)
540     self.queue = queue
541
542
543 class JobQueue(object):
544   """Queue used to manage the jobs.
545
546   @cvar _RE_JOB_FILE: regex matching the valid job file names
547
548   """
549   _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
550
551   def _RequireOpenQueue(fn):
552     """Decorator for "public" functions.
553
554     This function should be used for all 'public' functions. That is,
555     functions usually called from other classes.
556
557     @warning: Use this decorator only after utils.LockedMethod!
558
559     Example::
560       @utils.LockedMethod
561       @_RequireOpenQueue
562       def Example(self):
563         pass
564
565     """
566     def wrapper(self, *args, **kwargs):
567       assert self._queue_lock is not None, "Queue should be open"
568       return fn(self, *args, **kwargs)
569     return wrapper
570
571   def __init__(self, context):
572     """Constructor for JobQueue.
573
574     The constructor will initialize the job queue object and then
575     start loading the current jobs from disk, either for starting them
576     (if they were queue) or for aborting them (if they were already
577     running).
578
579     @type context: GanetiContext
580     @param context: the context object for access to the configuration
581         data and other ganeti objects
582
583     """
584     self.context = context
585     self._memcache = weakref.WeakValueDictionary()
586     self._my_hostname = utils.HostInfo().name
587
588     # Locking
589     self._lock = threading.Lock()
590     self.acquire = self._lock.acquire
591     self.release = self._lock.release
592
593     # Initialize
594     self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
595
596     # Read serial file
597     self._last_serial = jstore.ReadSerial()
598     assert self._last_serial is not None, ("Serial file was modified between"
599                                            " check in jstore and here")
600
601     # Get initial list of nodes
602     self._nodes = dict((n.name, n.primary_ip)
603                        for n in self.context.cfg.GetAllNodesInfo().values()
604                        if n.master_candidate)
605
606     # Remove master node
607     try:
608       del self._nodes[self._my_hostname]
609     except KeyError:
610       pass
611
612     # TODO: Check consistency across nodes
613
614     # Setup worker pool
615     self._wpool = _JobQueueWorkerPool(self)
616     try:
617       # We need to lock here because WorkerPool.AddTask() may start a job while
618       # we're still doing our work.
619       self.acquire()
620       try:
621         logging.info("Inspecting job queue")
622
623         all_job_ids = self._GetJobIDsUnlocked()
624         jobs_count = len(all_job_ids)
625         lastinfo = time.time()
626         for idx, job_id in enumerate(all_job_ids):
627           # Give an update every 1000 jobs or 10 seconds
628           if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
629               idx == (jobs_count - 1)):
630             logging.info("Job queue inspection: %d/%d (%0.1f %%)",
631                          idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
632             lastinfo = time.time()
633
634           job = self._LoadJobUnlocked(job_id)
635
636           # a failure in loading the job can cause 'None' to be returned
637           if job is None:
638             continue
639
640           status = job.CalcStatus()
641
642           if status in (constants.JOB_STATUS_QUEUED, ):
643             self._wpool.AddTask(job)
644
645           elif status in (constants.JOB_STATUS_RUNNING,
646                           constants.JOB_STATUS_WAITLOCK,
647                           constants.JOB_STATUS_CANCELING):
648             logging.warning("Unfinished job %s found: %s", job.id, job)
649             try:
650               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
651                                     "Unclean master daemon shutdown")
652             finally:
653               self.UpdateJobUnlocked(job)
654
655         logging.info("Job queue inspection finished")
656       finally:
657         self.release()
658     except:
659       self._wpool.TerminateWorkers()
660       raise
661
662   @utils.LockedMethod
663   @_RequireOpenQueue
664   def AddNode(self, node):
665     """Register a new node with the queue.
666
667     @type node: L{objects.Node}
668     @param node: the node object to be added
669
670     """
671     node_name = node.name
672     assert node_name != self._my_hostname
673
674     # Clean queue directory on added node
675     result = rpc.RpcRunner.call_jobqueue_purge(node_name)
676     msg = result.fail_msg
677     if msg:
678       logging.warning("Cannot cleanup queue directory on node %s: %s",
679                       node_name, msg)
680
681     if not node.master_candidate:
682       # remove if existing, ignoring errors
683       self._nodes.pop(node_name, None)
684       # and skip the replication of the job ids
685       return
686
687     # Upload the whole queue excluding archived jobs
688     files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
689
690     # Upload current serial file
691     files.append(constants.JOB_QUEUE_SERIAL_FILE)
692
693     for file_name in files:
694       # Read file content
695       content = utils.ReadFile(file_name)
696
697       result = rpc.RpcRunner.call_jobqueue_update([node_name],
698                                                   [node.primary_ip],
699                                                   file_name, content)
700       msg = result[node_name].fail_msg
701       if msg:
702         logging.error("Failed to upload file %s to node %s: %s",
703                       file_name, node_name, msg)
704
705     self._nodes[node_name] = node.primary_ip
706
707   @utils.LockedMethod
708   @_RequireOpenQueue
709   def RemoveNode(self, node_name):
710     """Callback called when removing nodes from the cluster.
711
712     @type node_name: str
713     @param node_name: the name of the node to remove
714
715     """
716     try:
717       # The queue is removed by the "leave node" RPC call.
718       del self._nodes[node_name]
719     except KeyError:
720       pass
721
722   def _CheckRpcResult(self, result, nodes, failmsg):
723     """Verifies the status of an RPC call.
724
725     Since we aim to keep consistency should this node (the current
726     master) fail, we will log errors if our rpc fail, and especially
727     log the case when more than half of the nodes fails.
728
729     @param result: the data as returned from the rpc call
730     @type nodes: list
731     @param nodes: the list of nodes we made the call to
732     @type failmsg: str
733     @param failmsg: the identifier to be used for logging
734
735     """
736     failed = []
737     success = []
738
739     for node in nodes:
740       msg = result[node].fail_msg
741       if msg:
742         failed.append(node)
743         logging.error("RPC call %s failed on node %s: %s",
744                       result[node].call, node, msg)
745       else:
746         success.append(node)
747
748     # +1 for the master node
749     if (len(success) + 1) < len(failed):
750       # TODO: Handle failing nodes
751       logging.error("More than half of the nodes failed")
752
753   def _GetNodeIp(self):
754     """Helper for returning the node name/ip list.
755
756     @rtype: (list, list)
757     @return: a tuple of two lists, the first one with the node
758         names and the second one with the node addresses
759
760     """
761     name_list = self._nodes.keys()
762     addr_list = [self._nodes[name] for name in name_list]
763     return name_list, addr_list
764
765   def _WriteAndReplicateFileUnlocked(self, file_name, data):
766     """Writes a file locally and then replicates it to all nodes.
767
768     This function will replace the contents of a file on the local
769     node and then replicate it to all the other nodes we have.
770
771     @type file_name: str
772     @param file_name: the path of the file to be replicated
773     @type data: str
774     @param data: the new contents of the file
775
776     """
777     utils.WriteFile(file_name, data=data)
778
779     names, addrs = self._GetNodeIp()
780     result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
781     self._CheckRpcResult(result, self._nodes,
782                          "Updating %s" % file_name)
783
784   def _RenameFilesUnlocked(self, rename):
785     """Renames a file locally and then replicate the change.
786
787     This function will rename a file in the local queue directory
788     and then replicate this rename to all the other nodes we have.
789
790     @type rename: list of (old, new)
791     @param rename: List containing tuples mapping old to new names
792
793     """
794     # Rename them locally
795     for old, new in rename:
796       utils.RenameFile(old, new, mkdir=True)
797
798     # ... and on all nodes
799     names, addrs = self._GetNodeIp()
800     result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
801     self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
802
803   def _FormatJobID(self, job_id):
804     """Convert a job ID to string format.
805
806     Currently this just does C{str(job_id)} after performing some
807     checks, but if we want to change the job id format this will
808     abstract this change.
809
810     @type job_id: int or long
811     @param job_id: the numeric job id
812     @rtype: str
813     @return: the formatted job id
814
815     """
816     if not isinstance(job_id, (int, long)):
817       raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
818     if job_id < 0:
819       raise errors.ProgrammerError("Job ID %s is negative" % job_id)
820
821     return str(job_id)
822
823   @classmethod
824   def _GetArchiveDirectory(cls, job_id):
825     """Returns the archive directory for a job.
826
827     @type job_id: str
828     @param job_id: Job identifier
829     @rtype: str
830     @return: Directory name
831
832     """
833     return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
834
835   def _NewSerialsUnlocked(self, count):
836     """Generates a new job identifier.
837
838     Job identifiers are unique during the lifetime of a cluster.
839
840     @type count: integer
841     @param count: how many serials to return
842     @rtype: str
843     @return: a string representing the job identifier.
844
845     """
846     assert count > 0
847     # New number
848     serial = self._last_serial + count
849
850     # Write to file
851     self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
852                                         "%s\n" % serial)
853
854     result = [self._FormatJobID(v)
855               for v in range(self._last_serial, serial + 1)]
856     # Keep it only if we were able to write the file
857     self._last_serial = serial
858
859     return result
860
861   @staticmethod
862   def _GetJobPath(job_id):
863     """Returns the job file for a given job id.
864
865     @type job_id: str
866     @param job_id: the job identifier
867     @rtype: str
868     @return: the path to the job file
869
870     """
871     return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
872
873   @classmethod
874   def _GetArchivedJobPath(cls, job_id):
875     """Returns the archived job file for a give job id.
876
877     @type job_id: str
878     @param job_id: the job identifier
879     @rtype: str
880     @return: the path to the archived job file
881
882     """
883     path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
884     return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
885
886   @classmethod
887   def _ExtractJobID(cls, name):
888     """Extract the job id from a filename.
889
890     @type name: str
891     @param name: the job filename
892     @rtype: job id or None
893     @return: the job id corresponding to the given filename,
894         or None if the filename does not represent a valid
895         job file
896
897     """
898     m = cls._RE_JOB_FILE.match(name)
899     if m:
900       return m.group(1)
901     else:
902       return None
903
904   def _GetJobIDsUnlocked(self, archived=False):
905     """Return all known job IDs.
906
907     If the parameter archived is True, archived jobs IDs will be
908     included. Currently this argument is unused.
909
910     The method only looks at disk because it's a requirement that all
911     jobs are present on disk (so in the _memcache we don't have any
912     extra IDs).
913
914     @rtype: list
915     @return: the list of job IDs
916
917     """
918     jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
919     jlist = utils.NiceSort(jlist)
920     return jlist
921
922   def _ListJobFiles(self):
923     """Returns the list of current job files.
924
925     @rtype: list
926     @return: the list of job file names
927
928     """
929     return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
930             if self._RE_JOB_FILE.match(name)]
931
932   def _LoadJobUnlocked(self, job_id):
933     """Loads a job from the disk or memory.
934
935     Given a job id, this will return the cached job object if
936     existing, or try to load the job from the disk. If loading from
937     disk, it will also add the job to the cache.
938
939     @param job_id: the job id
940     @rtype: L{_QueuedJob} or None
941     @return: either None or the job object
942
943     """
944     job = self._memcache.get(job_id, None)
945     if job:
946       logging.debug("Found job %s in memcache", job_id)
947       return job
948
949     filepath = self._GetJobPath(job_id)
950     logging.debug("Loading job from %s", filepath)
951     try:
952       raw_data = utils.ReadFile(filepath)
953     except IOError, err:
954       if err.errno in (errno.ENOENT, ):
955         return None
956       raise
957
958     data = serializer.LoadJson(raw_data)
959
960     try:
961       job = _QueuedJob.Restore(self, data)
962     except Exception, err:
963       new_path = self._GetArchivedJobPath(job_id)
964       if filepath == new_path:
965         # job already archived (future case)
966         logging.exception("Can't parse job %s", job_id)
967       else:
968         # non-archived case
969         logging.exception("Can't parse job %s, will archive.", job_id)
970         self._RenameFilesUnlocked([(filepath, new_path)])
971       return None
972
973     self._memcache[job_id] = job
974     logging.debug("Added job %s to the cache", job_id)
975     return job
976
977   def _GetJobsUnlocked(self, job_ids):
978     """Return a list of jobs based on their IDs.
979
980     @type job_ids: list
981     @param job_ids: either an empty list (meaning all jobs),
982         or a list of job IDs
983     @rtype: list
984     @return: the list of job objects
985
986     """
987     if not job_ids:
988       job_ids = self._GetJobIDsUnlocked()
989
990     return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
991
992   @staticmethod
993   def _IsQueueMarkedDrain():
994     """Check if the queue is marked from drain.
995
996     This currently uses the queue drain file, which makes it a
997     per-node flag. In the future this can be moved to the config file.
998
999     @rtype: boolean
1000     @return: True of the job queue is marked for draining
1001
1002     """
1003     return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1004
1005   @staticmethod
1006   def SetDrainFlag(drain_flag):
1007     """Sets the drain flag for the queue.
1008
1009     This is similar to the function L{backend.JobQueueSetDrainFlag},
1010     and in the future we might merge them.
1011
1012     @type drain_flag: boolean
1013     @param drain_flag: Whether to set or unset the drain flag
1014
1015     """
1016     if drain_flag:
1017       utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1018     else:
1019       utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1020     return True
1021
1022   @_RequireOpenQueue
1023   def _SubmitJobUnlocked(self, job_id, ops):
1024     """Create and store a new job.
1025
1026     This enters the job into our job queue and also puts it on the new
1027     queue, in order for it to be picked up by the queue processors.
1028
1029     @type job_id: job ID
1030     @param job_id: the job ID for the new job
1031     @type ops: list
1032     @param ops: The list of OpCodes that will become the new job.
1033     @rtype: job ID
1034     @return: the job ID of the newly created job
1035     @raise errors.JobQueueDrainError: if the job is marked for draining
1036
1037     """
1038     if self._IsQueueMarkedDrain():
1039       raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1040
1041     # Check job queue size
1042     size = len(self._ListJobFiles())
1043     if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1044       # TODO: Autoarchive jobs. Make sure it's not done on every job
1045       # submission, though.
1046       #size = ...
1047       pass
1048
1049     if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1050       raise errors.JobQueueFull()
1051
1052     job = _QueuedJob(self, job_id, ops)
1053
1054     # Write to disk
1055     self.UpdateJobUnlocked(job)
1056
1057     logging.debug("Adding new job %s to the cache", job_id)
1058     self._memcache[job_id] = job
1059
1060     # Add to worker pool
1061     self._wpool.AddTask(job)
1062
1063     return job.id
1064
1065   @utils.LockedMethod
1066   @_RequireOpenQueue
1067   def SubmitJob(self, ops):
1068     """Create and store a new job.
1069
1070     @see: L{_SubmitJobUnlocked}
1071
1072     """
1073     job_id = self._NewSerialsUnlocked(1)[0]
1074     return self._SubmitJobUnlocked(job_id, ops)
1075
1076   @utils.LockedMethod
1077   @_RequireOpenQueue
1078   def SubmitManyJobs(self, jobs):
1079     """Create and store multiple jobs.
1080
1081     @see: L{_SubmitJobUnlocked}
1082
1083     """
1084     results = []
1085     all_job_ids = self._NewSerialsUnlocked(len(jobs))
1086     for job_id, ops in zip(all_job_ids, jobs):
1087       try:
1088         data = self._SubmitJobUnlocked(job_id, ops)
1089         status = True
1090       except errors.GenericError, err:
1091         data = str(err)
1092         status = False
1093       results.append((status, data))
1094
1095     return results
1096
1097   @_RequireOpenQueue
1098   def UpdateJobUnlocked(self, job):
1099     """Update a job's on disk storage.
1100
1101     After a job has been modified, this function needs to be called in
1102     order to write the changes to disk and replicate them to the other
1103     nodes.
1104
1105     @type job: L{_QueuedJob}
1106     @param job: the changed job
1107
1108     """
1109     filename = self._GetJobPath(job.id)
1110     data = serializer.DumpJson(job.Serialize(), indent=False)
1111     logging.debug("Writing job %s to %s", job.id, filename)
1112     self._WriteAndReplicateFileUnlocked(filename, data)
1113
1114     # Notify waiters about potential changes
1115     job.change.notifyAll()
1116
1117   @utils.LockedMethod
1118   @_RequireOpenQueue
1119   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1120                         timeout):
1121     """Waits for changes in a job.
1122
1123     @type job_id: string
1124     @param job_id: Job identifier
1125     @type fields: list of strings
1126     @param fields: Which fields to check for changes
1127     @type prev_job_info: list or None
1128     @param prev_job_info: Last job information returned
1129     @type prev_log_serial: int
1130     @param prev_log_serial: Last job message serial number
1131     @type timeout: float
1132     @param timeout: maximum time to wait
1133     @rtype: tuple (job info, log entries)
1134     @return: a tuple of the job information as required via
1135         the fields parameter, and the log entries as a list
1136
1137         if the job has not changed and the timeout has expired,
1138         we instead return a special value,
1139         L{constants.JOB_NOTCHANGED}, which should be interpreted
1140         as such by the clients
1141
1142     """
1143     logging.debug("Waiting for changes in job %s", job_id)
1144
1145     job_info = None
1146     log_entries = None
1147
1148     end_time = time.time() + timeout
1149     while True:
1150       delta_time = end_time - time.time()
1151       if delta_time < 0:
1152         return constants.JOB_NOTCHANGED
1153
1154       job = self._LoadJobUnlocked(job_id)
1155       if not job:
1156         logging.debug("Job %s not found", job_id)
1157         break
1158
1159       status = job.CalcStatus()
1160       job_info = self._GetJobInfoUnlocked(job, fields)
1161       log_entries = job.GetLogEntries(prev_log_serial)
1162
1163       # Serializing and deserializing data can cause type changes (e.g. from
1164       # tuple to list) or precision loss. We're doing it here so that we get
1165       # the same modifications as the data received from the client. Without
1166       # this, the comparison afterwards might fail without the data being
1167       # significantly different.
1168       job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1169       log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1170
1171       if status not in (constants.JOB_STATUS_QUEUED,
1172                         constants.JOB_STATUS_RUNNING,
1173                         constants.JOB_STATUS_WAITLOCK):
1174         # Don't even try to wait if the job is no longer running, there will be
1175         # no changes.
1176         break
1177
1178       if (prev_job_info != job_info or
1179           (log_entries and prev_log_serial != log_entries[0][0])):
1180         break
1181
1182       logging.debug("Waiting again")
1183
1184       # Release the queue lock while waiting
1185       job.change.wait(delta_time)
1186
1187     logging.debug("Job %s changed", job_id)
1188
1189     if job_info is None and log_entries is None:
1190       return None
1191     else:
1192       return (job_info, log_entries)
1193
1194   @utils.LockedMethod
1195   @_RequireOpenQueue
1196   def CancelJob(self, job_id):
1197     """Cancels a job.
1198
1199     This will only succeed if the job has not started yet.
1200
1201     @type job_id: string
1202     @param job_id: job ID of job to be cancelled.
1203
1204     """
1205     logging.info("Cancelling job %s", job_id)
1206
1207     job = self._LoadJobUnlocked(job_id)
1208     if not job:
1209       logging.debug("Job %s not found", job_id)
1210       return (False, "Job %s not found" % job_id)
1211
1212     job_status = job.CalcStatus()
1213
1214     if job_status not in (constants.JOB_STATUS_QUEUED,
1215                           constants.JOB_STATUS_WAITLOCK):
1216       logging.debug("Job %s is no longer waiting in the queue", job.id)
1217       return (False, "Job %s is no longer waiting in the queue" % job.id)
1218
1219     if job_status == constants.JOB_STATUS_QUEUED:
1220       self.CancelJobUnlocked(job)
1221       return (True, "Job %s canceled" % job.id)
1222
1223     elif job_status == constants.JOB_STATUS_WAITLOCK:
1224       # The worker will notice the new status and cancel the job
1225       try:
1226         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1227       finally:
1228         self.UpdateJobUnlocked(job)
1229       return (True, "Job %s will be canceled" % job.id)
1230
1231   @_RequireOpenQueue
1232   def CancelJobUnlocked(self, job):
1233     """Marks a job as canceled.
1234
1235     """
1236     try:
1237       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1238                             "Job canceled by request")
1239     finally:
1240       self.UpdateJobUnlocked(job)
1241
1242   @_RequireOpenQueue
1243   def _ArchiveJobsUnlocked(self, jobs):
1244     """Archives jobs.
1245
1246     @type jobs: list of L{_QueuedJob}
1247     @param jobs: Job objects
1248     @rtype: int
1249     @return: Number of archived jobs
1250
1251     """
1252     archive_jobs = []
1253     rename_files = []
1254     for job in jobs:
1255       if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1256                                   constants.JOB_STATUS_SUCCESS,
1257                                   constants.JOB_STATUS_ERROR):
1258         logging.debug("Job %s is not yet done", job.id)
1259         continue
1260
1261       archive_jobs.append(job)
1262
1263       old = self._GetJobPath(job.id)
1264       new = self._GetArchivedJobPath(job.id)
1265       rename_files.append((old, new))
1266
1267     # TODO: What if 1..n files fail to rename?
1268     self._RenameFilesUnlocked(rename_files)
1269
1270     logging.debug("Successfully archived job(s) %s",
1271                   ", ".join(job.id for job in archive_jobs))
1272
1273     return len(archive_jobs)
1274
1275   @utils.LockedMethod
1276   @_RequireOpenQueue
1277   def ArchiveJob(self, job_id):
1278     """Archives a job.
1279
1280     This is just a wrapper over L{_ArchiveJobsUnlocked}.
1281
1282     @type job_id: string
1283     @param job_id: Job ID of job to be archived.
1284     @rtype: bool
1285     @return: Whether job was archived
1286
1287     """
1288     logging.info("Archiving job %s", job_id)
1289
1290     job = self._LoadJobUnlocked(job_id)
1291     if not job:
1292       logging.debug("Job %s not found", job_id)
1293       return False
1294
1295     return self._ArchiveJobsUnlocked([job]) == 1
1296
1297   @utils.LockedMethod
1298   @_RequireOpenQueue
1299   def AutoArchiveJobs(self, age, timeout):
1300     """Archives all jobs based on age.
1301
1302     The method will archive all jobs which are older than the age
1303     parameter. For jobs that don't have an end timestamp, the start
1304     timestamp will be considered. The special '-1' age will cause
1305     archival of all jobs (that are not running or queued).
1306
1307     @type age: int
1308     @param age: the minimum age in seconds
1309
1310     """
1311     logging.info("Archiving jobs with age more than %s seconds", age)
1312
1313     now = time.time()
1314     end_time = now + timeout
1315     archived_count = 0
1316     last_touched = 0
1317
1318     all_job_ids = self._GetJobIDsUnlocked(archived=False)
1319     pending = []
1320     for idx, job_id in enumerate(all_job_ids):
1321       last_touched = idx
1322
1323       # Not optimal because jobs could be pending
1324       # TODO: Measure average duration for job archival and take number of
1325       # pending jobs into account.
1326       if time.time() > end_time:
1327         break
1328
1329       # Returns None if the job failed to load
1330       job = self._LoadJobUnlocked(job_id)
1331       if job:
1332         if job.end_timestamp is None:
1333           if job.start_timestamp is None:
1334             job_age = job.received_timestamp
1335           else:
1336             job_age = job.start_timestamp
1337         else:
1338           job_age = job.end_timestamp
1339
1340         if age == -1 or now - job_age[0] > age:
1341           pending.append(job)
1342
1343           # Archive 10 jobs at a time
1344           if len(pending) >= 10:
1345             archived_count += self._ArchiveJobsUnlocked(pending)
1346             pending = []
1347
1348     if pending:
1349       archived_count += self._ArchiveJobsUnlocked(pending)
1350
1351     return (archived_count, len(all_job_ids) - last_touched - 1)
1352
1353   def _GetJobInfoUnlocked(self, job, fields):
1354     """Returns information about a job.
1355
1356     @type job: L{_QueuedJob}
1357     @param job: the job which we query
1358     @type fields: list
1359     @param fields: names of fields to return
1360     @rtype: list
1361     @return: list with one element for each field
1362     @raise errors.OpExecError: when an invalid field
1363         has been passed
1364
1365     """
1366     row = []
1367     for fname in fields:
1368       if fname == "id":
1369         row.append(job.id)
1370       elif fname == "status":
1371         row.append(job.CalcStatus())
1372       elif fname == "ops":
1373         row.append([op.input.__getstate__() for op in job.ops])
1374       elif fname == "opresult":
1375         row.append([op.result for op in job.ops])
1376       elif fname == "opstatus":
1377         row.append([op.status for op in job.ops])
1378       elif fname == "oplog":
1379         row.append([op.log for op in job.ops])
1380       elif fname == "opstart":
1381         row.append([op.start_timestamp for op in job.ops])
1382       elif fname == "opend":
1383         row.append([op.end_timestamp for op in job.ops])
1384       elif fname == "received_ts":
1385         row.append(job.received_timestamp)
1386       elif fname == "start_ts":
1387         row.append(job.start_timestamp)
1388       elif fname == "end_ts":
1389         row.append(job.end_timestamp)
1390       elif fname == "lock_status":
1391         row.append(job.lock_status)
1392       elif fname == "summary":
1393         row.append([op.input.Summary() for op in job.ops])
1394       else:
1395         raise errors.OpExecError("Invalid job query field '%s'" % fname)
1396     return row
1397
1398   @utils.LockedMethod
1399   @_RequireOpenQueue
1400   def QueryJobs(self, job_ids, fields):
1401     """Returns a list of jobs in queue.
1402
1403     This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1404     processing for each job.
1405
1406     @type job_ids: list
1407     @param job_ids: sequence of job identifiers or None for all
1408     @type fields: list
1409     @param fields: names of fields to return
1410     @rtype: list
1411     @return: list one element per job, each element being list with
1412         the requested fields
1413
1414     """
1415     jobs = []
1416
1417     for job in self._GetJobsUnlocked(job_ids):
1418       if job is None:
1419         jobs.append(None)
1420       else:
1421         jobs.append(self._GetJobInfoUnlocked(job, fields))
1422
1423     return jobs
1424
1425   @utils.LockedMethod
1426   @_RequireOpenQueue
1427   def Shutdown(self):
1428     """Stops the job queue.
1429
1430     This shutdowns all the worker threads an closes the queue.
1431
1432     """
1433     self._wpool.TerminateWorkers()
1434
1435     self._queue_lock.Close()
1436     self._queue_lock = None